In [1]:
import tensorflow as tf
from tensorflow import keras
from keras.layers import Conv1D, BatchNormalization, GlobalAveragePooling1D, Permute, Dropout, Flatten, Reshape, multiply
from keras.layers import Input, Dense, LSTM, concatenate, Activation, GRU, SimpleRNN, Masking
import numpy as np
from tqdm import tqdm
import datetime

In [4]:
def squeeze_excite_block(input):
    ''' Create a squeeze-excite block
    Args:
        input: input tensor
        filters: number of output filters
        k: width factor

    Returns: a keras tensor
    '''
    filters = input.shape[-1] # channel_axis = -1 for TF

    se = GlobalAveragePooling1D()(input)
    se = Reshape((1, filters))(se)
    se = Dense(filters // 16,  activation='relu', kernel_initializer='he_normal', use_bias=False)(se)
    se = Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se)
    se = multiply([input, se])
    return se


def build_model(input_shape, batch_size, num_classes):
    inputs = Input(shape=input_shape, batch_size=batch_size)     # input_shape=(len, dim)
    ip = Permute((2, 1))(inputs)  # input_shape=(dim, len)

    x = Masking()(ip)
    x = LSTM(8)(x)
    x = Dropout(0.8)(x)

    y = Permute((2, 1))(ip)
    y = Conv1D(128, 8, padding='same', kernel_initializer='he_uniform')(y)
    y = BatchNormalization()(y)
    y = Activation('relu')(y)
    y = squeeze_excite_block(y)

    y = Conv1D(256, 5, padding='same', kernel_initializer='he_uniform')(y)
    y = BatchNormalization()(y)
    y = Activation('relu')(y)
    y = squeeze_excite_block(y)

    y = Conv1D(128, 3, padding='same', kernel_initializer='he_uniform')(y)
    y = BatchNormalization()(y)
    y = Activation('relu')(y)

    y = GlobalAveragePooling1D()(y)

    x = concatenate([x, y])

    out = Dense(num_classes, activation='softmax')(x)

    model = keras.Model(inputs, out)
    # model.summary()

    # add load model code here to fine-tune

    return model


In [5]:
model = build_model((137, 15), 32, 5)

In [6]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(32, 137, 15)]              0         []                            
                                                                                                  
 permute_1 (Permute)         (32, 15, 137)                0         ['input_2[0][0]']             
                                                                                                  
 permute_2 (Permute)         (32, 137, 15)                0         ['permute_1[0][0]']           
                                                                                                  
 conv1d (Conv1D)             (32, 137, 128)               15488     ['permute_2[0][0]']           
                                                                                              

In [8]:
f = np.load("mfcc.npz")
X, Y = f['X'], f['Y']
x_train =  np.concatenate((X[0:200], X[300:1000]))
y_train =  np.concatenate((Y[0:200], Y[300:1000]))


x_test, y_test = X[200:300], Y[200:300]

In [9]:
train_dataset = tf.data.Dataset.from_tensor_slices((x_train[0:800], y_train[0:800]))
train_dataset = train_dataset.shuffle(buffer_size=800, reshuffle_each_iteration=True).batch(128, drop_remainder=True)
x_val = x_train[800:]
y_val = y_train[800:]

In [None]:
# model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# model.fit(x_train, y_train, epochs=1000, batch_size=32, validation_data=(x_test, y_test))


In [None]:
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=False)
acc_metric = keras.metrics.SparseCategoricalAccuracy()
optimizer = keras.optimizers.Adam(learning_rate=1e-3)


@tf.function
def training_step(x, y):

    with tf.GradientTape() as model_tape:
        logits = model(x, training=True)
        loss = loss_fn(y, logits)
    grads = model_tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    acc_metric.update_state(y, logits)
    acc = acc_metric.result()
    acc_metric.reset_states()

    return loss, acc

log = {"training_loss":[], "training_acc":[],
        "val_loss":[], "val_acc":[], "test_acc":[], "test_logits":[]}
log_path = "log" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + ".npy"

for epoch in tqdm(range(2000)):
    epoch_loss = 0
    epoch_acc = 0
    for step, (x, y) in enumerate(train_dataset):
        batch_loss, batch_acc = training_step(x, y)
        epoch_loss += float(batch_loss)
        epoch_acc += float(batch_acc)
    epoch_loss /= step+1
    epoch_acc /= step+1

    val_loss = 0
    val_acc = 0
    val_logits = model(x_val, training=False)
    val_loss = loss_fn(y_val, val_logits)
    acc_metric.update_state(y_val, val_logits)
    val_acc = acc_metric.result().numpy()
    acc_metric.reset_states()

    # print("Validation loss: %.4f" % (float(val_loss)))
    # print("Validation acc: %.4f" % (float(val_acc)))

    test_logits = model(x_test, training=False)
    acc_metric.update_state(y_test, test_logits)
    test_acc = acc_metric.result().numpy()
    acc_metric.reset_states()

    log["training_loss"].append(epoch_loss)
    log["training_acc"].append(epoch_acc)
    log["val_loss"].append(val_loss)
    log["val_acc"].append(val_acc)
    log["test_acc"].append(test_acc)
    log["test_logits"].append(test_logits)


    np.save(log_path, [log])

log['test_acc'] = np.array(log['test_acc'])
log['val_loss'] = np.array(log['val_loss'])
testing_metric = 0
if len(log['test_acc'][np.where(log['val_loss']-min(log['val_loss'])<1e-6)]) != 0:
    testing_metric = log['test_acc'][np.where((max(log['val_acc']-log['val_loss']) - (log['val_acc']-log['val_loss']))<1e-6)][0]
print(testing_metric)

 55%|█████▍    | 1099/2000 [04:35<03:31,  4.26it/s]