In [1]:
import math
import pickle
import keras_tuner
import numpy as np
import librosa as lr
import tensorflow as tf
import tensorflow.keras as keras

from tensorflow.keras import Input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping

In [2]:
def import_pkl(path):
    data = []
    with (open(path, 'rb')) as openfile:
        while True:
            try:
                data = pickle.load(openfile)
            except EOFError:
                break
    return data['X_train'], data['y_train'], data['X_test'], data['y_test']

In [3]:
train00_X, train00_y, test00_X, test00_y = import_pkl('../data/id_00_raw.pkl')
train02_X, train02_y, test02_X, test02_y = import_pkl('../data/id_02_raw.pkl')
train04_X, train04_y, test04_X, test04_y = import_pkl('../data/id_04_raw.pkl')
train06_X, train06_y, test06_X, test06_y = import_pkl('../data/id_06_raw.pkl')

In [29]:
class MyHyperModel(keras_tuner.HyperModel):
    
    def __init__(self, min_delta):
        self.input_shape_x = 0
        self.input_shape_y = 0
        self.min_delta = min_delta
    
    def build(self, hp):
        
        n_mels = hp.Choice('n_mels', [128, 256, 512])
        hop_length = hp.Choice('hop_length', [128, 256, 512])

        self.input_shape_x = n_mels
        self.input_shape_y = hop_length
               
        inputLayer = Input(shape = (self.input_shape_x, math.ceil((22050 * 10) / self.input_shape_y)))
        h = Flatten()(inputLayer)
        h = Dense(units = hp.Choice('layer 1', [64, 128, 256]), activation = 'relu')(h)
        h = Dense(units = hp.Choice('layer 2', [64, 32, 16]), activation = 'relu')(h)
        h = Dense(units = hp.Choice('layer 3', [4, 8]), activation = 'relu')(h)
        h = Dense(units = hp.Choice('layer 4', [64, 32, 16]), activation = 'relu')(h)
        h = Dense(units = hp.Choice('layer 5', [64, 128, 256]), activation = 'relu')(h)
        h = Dense(self.input_shape_x * math.ceil((22050 * 10) / self.input_shape_y), activation = None)(h)
        h = Reshape((self.input_shape_x, math.ceil((22050 * 10) / self.input_shape_y)))(h)
                  
        Model(inputs = inputLayer, outputs = h).compile(loss = 'mean_squared_error', metrics = ['mse'])

        return Model(inputs = inputLayer, outputs = h)

    def fit(self, hp, model, train, validation, callbacks = None, **kwargs):
        
        def convert_to_melspectrogram(audio_files, sr, n_fft, hop_length, n_mels):
            audio_mel = lr.feature.melspectrogram(y = audio_files[0], sr = sr, n_fft = n_fft, hop_length = hop_length, n_mels = n_mels)
            audio_mel_dbd = lr.amplitude_to_db(abs(audio_mel), ref = np.max)
            shape_0 = audio_mel_dbd.shape[0]
            shape_1 = audio_mel_dbd.shape[1]
            audio_mel_dbd = np.reshape(audio_mel_dbd, [1, shape_0, shape_1])

            for file in audio_files[1:10]:
                audio_mel_temp = lr.feature.melspectrogram(y = file, sr = sr, n_fft = n_fft, hop_length = hop_length, n_mels = n_mels)
                audio_mel_dbd_temp = lr.amplitude_to_db(abs(audio_mel_temp), ref = np.max)
                audio_mel_dbd = np.vstack([audio_mel_dbd, np.reshape(audio_mel_dbd_temp, [1, shape_0, shape_1])])
            return audio_mel_dbd

        train_ds = convert_to_melspectrogram(train, 22050, 1024, self.input_shape_y, self.input_shape_x)
                            
        # Define the optimizer.
        optimizer = keras.optimizers.Adam(hp.Float('learning_rate', 1e-4, 1e-2, sampling = 'log', default = 1e-3))
        loss_fn = keras.losses.MeanSquaredError(reduction = 'auto', name = 'mean_squared_error')

        # The metric to track validation loss.
        epoch_loss_metric = keras.metrics.Mean()

        # Function to run the train step.
        @tf.function
        def run_train_step(train):
            with tf.GradientTape() as tape:
                logits = model(train)
                loss = loss_fn(train, logits)
                # Add any regularization losses.
                if model.losses:
                    loss +=  tf.math.add_n(model.losses)
            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        # Function to run the validation step.
        @tf.function
        def run_val_step(validation):
            logits = model(validation)
            loss = loss_fn(validation, logits)
            # Update the metric.
            epoch_loss_metric.update_state(loss)

        # Assign the model to the callbacks.
        for callback in callbacks:
            callback.model = model

        # Record the best validation loss value
        best_epoch_loss = float("inf")

        # The custom training loop.
#         epochs = hp.Choice('epochs', [25, 50, 75])
        for epoch in range(5):
            print(f"Epoch: {epoch}")

            # Iterate the training data to run the training step.
            for sample in train_ds:
                run_train_step(sample.reshape(1, sample.shape[0], sample.shape[1]))

            # Iterate the validation data to run the validation step.
            for sample in train_ds:
                run_val_step(sample.reshape(1, sample.shape[0], sample.shape[1]))

            # Calling the callbacks after epoch.
            epoch_loss = float(epoch_loss_metric.result().numpy())
            for callback in callbacks:
                # The "val_loss" is the objective passed to the tuner.
                callback.on_epoch_end(epoch, logs = {"val_loss": epoch_loss})
            epoch_loss_metric.reset_states()

            print(f"Epoch loss: {epoch_loss}")
            
            if best_epoch_loss <= (epoch_loss + self.min_delta):
                break
            else:
                best_epoch_loss = min(best_epoch_loss, epoch_loss)

        # Return the evaluation metric value.
        return best_epoch_loss

In [30]:
tuner = keras_tuner.RandomSearch(
    objective = keras_tuner.Objective('val_loss', 'min'),
    seed = 41,
    max_trials = 5,
    hypermodel = MyHyperModel(min_delta = 0),
    directory = '../model/results',
    project_name = 'custom_training',
    overwrite = True,
)

In [31]:
tuner.search(train = train00_X, validation = train00_X)

Trial 2 Complete [00h 00m 57s]
val_loss: 324.7400207519531

Best val_loss So Far: 324.7400207519531
Total elapsed time: 00h 01m 13s

Search: Running Trial #3

Value             |Best Value So Far |Hyperparameter
512               |256               |n_mels
128               |128               |hop_length
128               |128               |layer 1
64                |64                |layer 2
4                 |8                 |layer 3
32                |32                |layer 4
256               |256               |layer 5
0.00077674        |0.00015159        |learning_rate



KeyboardInterrupt: 

In [None]:
best_hps = tuner.get_best_hyperparameters()[0]
print(best_hps.values)

best_model = tuner.get_best_models()[0]
print(best_model.summary())