In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
import os
import json
import itertools

import numpy as np
from keras.models import Sequential
from keras.optimizers import Adam
from keras.utils import np_utils
from sklearn import metrics
from sklearn.model_selection import train_test_split
from keras.callbacks import EarlyStopping

from keras.layers import Dense, Dropout
from keras.layers import Flatten, Conv1D, LeakyReLU, BatchNormalization
from keras.layers import LSTM

In [0]:
# paths
directory = "drive/My Drive/UE_Proell"
preprocessed = f"{directory}/preprocessed"
gridsearch = f"{directory}/gridsearch"

# hyper parameter
loss = 'categorical_crossentropy'
leaky_alpha = 0.01
epochs = 150

learning_rates = [0.01, 0.001]
dropout_rates  = [0.2, 0.5]
batch_sizes    = [10, 20, 40]

neurons    = [128, 256, 512]
kernels    = [3, 5, 7]
lstm_units = [16, 32, 48]

# configuration parameter
validation_split = 0.2
patience = 5
verbose = 0
seed = 42

# labels
labels = [
    "air conditioner",
    "car horn",
    "children playing",
    "dog bark",
    "drilling",
    "engine idling",
    "gun shot",
    "jackhammer",
    "siren",
    "street music"
]

In [0]:
os.makedirs(gridsearch, exist_ok=True)

X_train, y_train = np.load(f"{preprocessed}/X_train.npy"), np.load(f"{preprocessed}/y_train.npy")
X_test, y_test = np.load(f"{preprocessed}/X_test.npy"), np.load(f"{preprocessed}/y_test.npy")

X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=validation_split, random_state=seed, stratify=y_train)
num_of_labels = y_train.shape[1]

In [0]:
def getDNNs(input_shape, dropout):
  dnns = []
  for neuron in neurons:
    model = Sequential()
    model.add(Dense(neuron, input_shape=input_shape, activation='relu'))
    model.add(Dropout(dropout))
    model.add(Dense(num_of_labels, activation='softmax'))
    dnns.append({
        "name": f"l1_{neuron}_relu", "model": model
    })
  for n1, n2 in itertools.product(neurons, neurons):
    model = Sequential()
    model.add(Dense(n1, input_shape=input_shape, activation='relu'))
    model.add(Dropout(dropout))
    model.add(Dense(n2, activation='relu'))
    model.add(Dropout(dropout))
    model.add(Dense(num_of_labels, activation='softmax'))
    dnns.append({
        "name": f"l2_{n1}_{n2}_relu", "model": model
    })
  return dnns

In [0]:
def getCNNs(input_shape, dropout):
  cnns = []
  for neuron in neurons:
    for kernel in kernels:
      model = Sequential()
      model.add(Conv1D(filters = 1, kernel_size = kernel, input_shape=input_shape))
      model.add(LeakyReLU(alpha = leaky_alpha))
      model.add(BatchNormalization())
      model.add(Flatten())
      model.add(Dropout(dropout))
      model.add(Dense(neuron, activation='relu'))
      model.add(Dense(num_of_labels, activation='softmax'))
      cnns.append({
          "name": f"l1_k{kernel}_d{neuron}_relu", "model": model
      })
    for k1, k2 in itertools.product(kernels, kernels):
      model = Sequential()
      model.add(Conv1D(filters = 1, kernel_size = k1, input_shape=input_shape))
      model.add(LeakyReLU(alpha = leaky_alpha))
      model.add(BatchNormalization())
      model.add(Conv1D(filters = 1, kernel_size = k2))
      model.add(LeakyReLU(alpha = leaky_alpha))
      model.add(BatchNormalization())
      model.add(Flatten())
      model.add(Dropout(dropout))
      model.add(Dense(neuron, activation='relu'))
      model.add(Dense(num_of_labels, activation='softmax'))
      cnns.append({
          "name": f"l2_k{k1}_k{k2}_d{neuron}_relu", "model": model
      })
  return cnns

In [0]:
def getLSTMs(input_shape, dropout):
  lstms = []
  for lstm in lstm_units:
    model = Sequential()
    model.add(LSTM(lstm, input_shape=input_shape))
    model.add(Dropout(dropout))
    model.add(Dense(num_of_labels, activation='softmax'))
    lstms.append({
        "name": f"l1_lstm{lstm}_relu", "model": model
    })
  return lstms

In [0]:
nn_methods = {
    "dnn": {
        "method": lambda input_shape: getDNNs((40, ), dropout),
        "tests": (len(neurons) + len(neurons) * len(neurons))
    },
    "cnn": {
         "method": lambda input_shape: getCNNs((40, 1), dropout),
         "tests":  (len(neurons) * len(kernels) + len(neurons) * len(kernels) * len(kernels))
    }
    "lstm": {
       "method": lambda input_shape: getLSTMs((1, 40), dropout),
       "tests":  (len(lstm_units))
    }
}

for network, n_dict in nn_methods.items():
  results = []

  counter = 1
  tests   = len(learning_rates) * len(dropout_rates) * len(batch_sizes) * n_dict["tests"]

  for learning_rate, dropout, batch_size in itertools.product(learning_rates, dropout_rates, batch_sizes):
    nns = n_dict["method"](dropout)

    for nn in nns:
      architecture = nn["name"]

      print(f"[{network}][{counter}/{tests}] Train network {network} -- {architecture} -- lr {learning_rate} -- d {dropout} -- bz {batch_size}")
      counter = counter + 1

      model = nn["model"]
      model.compile(Adam(lr=learning_rate), loss=loss, metrics=['accuracy'])

      callbacks = [
            EarlyStopping(monitor="loss", patience=patience, verbose=verbose, mode="auto"),
      ]
      
      X = {
          "train": X_train.copy(),
          "test": X_test.copy(),
          "valid": X_valid.copy()
      }

      if network == "cnn":
        X["train"] = np.expand_dims(X["train"], axis=2)
        X["test"] = np.expand_dims(X["test"], axis=2)
        X["valid"] = np.expand_dims(X["valid"], axis=2)
      if network == "lstm":
        X["train"] = np.expand_dims(X["train"], axis=1)
        X["test"] = np.expand_dims(X["test"], axis=1)
        X["valid"] = np.expand_dims(X["valid"], axis=1)

      hist = model.fit(X["train"], y_train, 
                      validation_data=(X["valid"], y_valid), 
                      batch_size=batch_size, 
                      epochs=epochs, 
                      shuffle=True, 
                      verbose=verbose, 
                      callbacks=callbacks)
      
      scores   = model.evaluate(X["test"], y_test, verbose=1)
      accuracy = scores[1] * 100
      print(f"\ttestaccuracy {accuracy}")

      y_pred_indices = np.argmax(model.predict(X["test"]), axis=1)
      y_test_indices = np.argmax(y_test, axis=1)

      report = metrics.classification_report(y_test_indices, y_pred_indices, labels=np.unique(y_test_indices), digits=3, output_dict=True)
      results.append({
          "network":       network,
          "architecture":  architecture,
          "learning_rate": learning_rate,
          "dropout":       dropout,
          "batch_size":    batch_size,
          "accuracy":      accuracy,
          "precision":     report["weighted avg"]["precision"],
          "recall":        report["weighted avg"]["recall"],
          "f1-score":      report["weighted avg"]["f1-score"]
      })

  with open(f"{gridsearch}/gs_{network}.json", "w") as fh:
    json.dump(results, fh, indent="\t")