In [1]:
from itertools import permutations
import sys
import tensorflow.keras.backend as K
import keras.preprocessing.sequence
from keras.utils import Sequence, to_categorical
from keras.layers import concatenate, Dense, Dropout, Layer, Conv1D, Activation, MaxPooling1D, AveragePooling1D
from keras.models import Sequential
from scipy import signal
import pickle
import numpy as np

# Allows me to import my modules
sys.path.append('./modules')
from audio_utils import *

Using TensorFlow backend.


Attempting to read settings file...
	Read successfully!


In [2]:
kltls = ['bass_drum-normal','hi_hat-normal',
  'hi_hat-open',
  'high_tom-normal',
  'ride-normal',
  'ride-bell',
  'crash-normal',
  'snare-normal',
  'low_tom-normal',
  'mid_tom-normal']

In [3]:
def labels_to_ys(labels):
    ys = np.zeros(len(kltls))
    for n in range(len(kltls)):
        kl, tl = kltls[n].split("-")
        for label_i in range(len(labels["hit_label"])):
            if (kl in labels["kit_label"][label_i] and tl in labels["tech_label"][label_i]):
                ys[n] = 1
    return ys

def ys_to_labels(ys, threshold = 0.6):
    labels = {"hit_label": [], "kit_label": [], "tech_label": []}
    for n in range(len(kltls)):
        kl, tl = kltls[n].split("-")
        if (ys[n] > threshold):
            hl = "beater" if kl == "bass_drum" else "stick"
            labels["hit_label"].append(hl)
            labels["kit_label"].append(kl)
            labels["tech_label"].append(tl)
    return labels

In [4]:
class AudioGenerator(Sequence):
    def get_ys(self, labels):
        return labels_to_ys(labels)
    
    def __init__(self, filenames, labels, data_type):
        self.filenames, self.labels, self.batch_size = filenames, labels, SETTINGS.data[data_type]["batch_size"]
        
    def __len__(self):
        return int(np.floor(len(self.filenames) / float(self.batch_size)))

    def __getitem__(self, idx):
        batch_x = self.filenames[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.labels[idx * self.batch_size:(idx + 1) * self.batch_size]
        return np.array([np.loadtxt(file_name) for file_name in batch_x]), np.array(list(map(self.get_ys, batch_y)))

In [5]:
# Data generators
generators = {"training": None, "test": None}
for data_type in generators.keys():
    sample_metadata = get_file_classes(data_type)
    filenames = [sm["filepath"] for sm in sample_metadata]
    labels = [sm["labels"] for sm in sample_metadata]
    generators[data_type] = AudioGenerator(filenames, labels, data_type)

In [6]:
# Test whether generator arguments are picklable (whether they can be multiprocessed)
use_multiprocessing = True
for gen in generators:
    try:
        pickle.dumps(gen)
    except:
        print(sys.exc_info())
        use_multiprocessing = False
        break
print("Picklable:", use_multiprocessing)

Picklable: True


In [None]:
# Adapted from https://keras.io/layers/writing-your-own-keras-layers/
class InceptionModule(Layer):
    
    def __init__(self, output_dim, **kwargs):
        self.output_dim = output_dim
        super(InceptionModule, self).__init__(**kwargs)

    def build(self, input_shape):
        # Create a trainable weight variable for this layer.
        self.conv_1_kernel = self.add_weight(name='kernel1', 
                                      shape=(input_shape[1], self.output_dim),
                                      initializer='uniform',
                                      trainable=True)
        self.conv_3a_kernel = self.add_weight(name='kernel3a', 
                                      shape=(input_shape[2], self.output_dim),
                                      initializer='uniform',
                                      trainable=True)
        self.conv_3b_kernel = self.add_weight(name='kernel3b', 
                                      shape=(input_shape[3], self.output_dim),
                                      initializer='uniform',
                                      trainable=True)
        self.conv_5a_kernel = self.add_weight(name='kernel5a', 
                                      shape=(input_shape[4], self.output_dim),
                                      initializer='uniform',
                                      trainable=True)
        self.conv_5b_kernel = self.add_weight(name='kernel5b', 
                                      shape=(input_shape[5], self.output_dim),
                                      initializer='uniform',
                                      trainable=True)
        self.conv_7a_kernel = self.add_weight(name='kernel7a', 
                                      shape=(input_shape[6], self.output_dim),
                                      initializer='uniform',
                                      trainable=True)
        self.conv_7b_kernel = self.add_weight(name='kernel7b', 
                                      shape=(input_shape[7], self.output_dim),
                                      initializer='uniform',
                                      trainable=True)
        super(InceptionModule, self).build(input_shape)  # Be sure to call this at the end

    def call(self, x):
        # Skip connection (uses input in concat)
        skip = x
        # Size 1 kernal conv of input (with tanh activation)
        conv_1_tower = K.conv1d(x, self.conv_1_kernal)
        conv_1_tower = K.tanh(conv_1_tower)
        # Size 1 -> size 3 kernal conv of input (with tanh activation)
        conv_3_tower = K.conv1D(x, self.conv_3a_kernal, padding="same")
        conv_3_tower = K.conv1D(conv_3_tower, self.conv_3b_kernal, padding="causal")
        conv_3_tower = K.tanh(conv_3_tower)
        # Size 1 -> size 5 kernal conv of input (with tanh activation)
        conv_5_tower = K.conv1D(x, self.conv_5a_kernal, padding="same")
        conv_5_tower = K.conv1D(conv_5_tower, self.conv_5b_kernal, padding="causal")
        conv_5_tower = K.tanh(conv_5_tower)
        # Size 1 -> size 7 kernal conv of input (with tanh activation)
        conv_7_tower = K.conv1D(x, self.conv_7a_kernal, padding="same")
        conv_7_tower = K.conv1D(conv_7_tower, self.conv_7b_kernal, padding="causal")
        conv_7_tower = K.tanh(conv_7_tower)
        # Concatenate all activation images
        return K.concatenate([skip, conv_1_tower, conv_3_tower, conv_5_tower, conv_7_tower], axis=1)

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.output_dim)

In [None]:
# Instantiate model
model = Sequential()

# Reusable dausal convolution layer structure
def CausalConvActLayer(nn, kernal_size, strides, filters):
    nn.add(Conv1D(filters=filters, kernal_size=kernal_size, strides=strides, padding="causal"))
    nn.add(Activation("tanh"))
    
# Reusable dilated convolution / inception module / dropout layer
def DilatedInceptionModuleLayer(nn, drop_rate):
    nn.add(Conv1D(kernal_size=3, padding="causal", dilation_rate=2))
    nn.add(Activation("tanh"))
    nn.add(InceptionModule())
    nn.add(Activation("tanh"))
    nn.add(Dropout(drop_rate))

cca_layers = [
    {"kernal_size": 7,
    "strides": 3,
    "filters": 50},
    {"kernal_size": 7,
    "strides": 2,
    "filters": 25},
    {"kernal_size": 5,
    "strides": 2,
    "filters": 10}
]
dim_layers = [
    {
        "drop_rate": 0.1
    },
    {
        "drop_rate": 0.15
    },
    {
        "drop_rate": 0.2
    },
    {
        "drop_rate": 0.25
    },
    {
        "drop_rate": 0.3
    }
]

# MODEL
"""
Rationale: 

3 "CausalConvAct" convolution layers which reduce the size of the sample space while increasing the size of the convolution space.
- Providing downscaling
(Feature extraction, while preserving temporal relationships)

Then "DilatedInceptionModule" layers which retain the size of the sample space while increasing the depth of the 
"""
for cca_layer in cca_layers:
    CausalConvActLayer(model, *cca_layer)
model.add(Dropout(0.05))
for dim_layer in dim_layers:
    DilatedInceptionModuleLayer(model, *dim_layer)
model.add(Activation("sigmoid"))
model.compile()

In [None]:
# Train model
model.fit_generator(generator=generators["training"],
                   validation_data=generators["validation"],
                   use_multiprocessing=use_multiprocessing,
                   workers=6)