In [None]:
import json
import os
import random

from keras import layers
from keras.models import Sequential
from matplotlib import collections  as mc
from matplotlib import pyplot as plt
from sklearn.cluster import KMeans
import numpy as np
import keras

In [None]:
random.seed(1337)
np.random.seed(451)

In [None]:
def data_from_file(file):
    # Read json sequence data
    with open(file) as f:
        data = json.load(f)
    return data

In [None]:
# Dataset
# ======================================================================
dataset_path = './files/mono-experiment/tomomi-original-99cls-3pca-10db.json'
use_dynamic_class = True
use_duration_class = True

# Hyperparameters
# ======================================================================
batch_size = 64          # How many samples are in a batch
seq_len = 3              # How long is the sequence / sample to train
data_split = 1/3         # Percentage for validation & testing set

num_layers = 1           # Number of hidden LSTM layers
num_units = 64           # Number of units per LSTM layer
epochs = 100             # How many epochs to train
dropout = 0.5            # Dropout after every layer

# Model
# ======================================================================
model_path = './mono-sequence-original-5-198cls-2pca-6db-100ms.h5'

In [None]:
# Load mono sequence data
dataset = data_from_file(dataset_path)

print(dataset['args'])

In [None]:
# Convert dataset to our training sequence data
sequence_data = []

# Prepare dataset
for step in dataset['sequence']:
    if not use_dynamic_class and not use_duration_class:
        sequence_step = step['class_sound']
    else:
        sequence_step = [
            step['class_sound'],
        ]
        
        if use_dynamic_class:
            sequence_step.append(step['class_dynamic'])
            
        if use_duration_class:
            sequence_step.append(step['class_duration'])
    
    sequence_data.append(sequence_step)
    
n_sound_classes = dataset['args']['n_clusters'] + 1 # plus silence class
n_dynamic_classes = dataset['args']['n_dynamics']
n_duration_classes = dataset['args']['n_durations']

if not use_dynamic_class and not use_duration_class:
    feature_matrix = np.zeros((n_sound_classes))
elif use_dynamic_class and not use_duration_class:
    feature_matrix = np.zeros((n_sound_classes,
                               n_dynamic_classes))
elif not use_dynamic_class and use_duration_class:
    feature_matrix = np.zeros((n_sound_classes,
                               n_duration_classes))
else: 
    feature_matrix = np.zeros((n_sound_classes,
                               n_dynamic_classes,
                               n_duration_classes))

# Encode sequence
if use_dynamic_class or use_duration_class:
    sequence_data = [np.ravel_multi_index(v, feature_matrix.shape) for v in sequence_data]
sequence_data = np.array(sequence_data)

print('Total classes: {}, steps: {}'.format(np.max(sequence_data), len(sequence_data)))

In [None]:
def generator(data, seq_len, min_index, max_index):
    i = min_index
    while 1:
        if i + batch_size >= max_index:
            i = min_index
        rows = np.arange(i, min(i + batch_size, max_index))
        i += len(rows)
        samples = np.zeros((len(rows), seq_len), dtype='int32')
        targets = np.zeros((len(rows)), dtype='int32')
        for j, _ in enumerate(rows):
            indices = range(rows[j], rows[j] + seq_len)
            if indices[-1] < max_index:
                targets[j] = data[indices][-1]
                samples[j] = data[indices]
        yield samples, targets

In [None]:
# Split in 3 sets for training, validation and testing
validation_steps = round((data_split / 2) * len(sequence_data))

train_max = len(sequence_data) - (validation_steps * 2)
val_min = train_max + 1
val_max = train_max + validation_steps + 1
test_min = train_max + validation_steps + 2
test_max = len(sequence_data) - 1

training_steps = test_max - test_min

train_gen = generator(sequence_data,
                      seq_len=seq_len,
                      min_index=0,
                      max_index=train_max)

val_gen = generator(sequence_data,
                    seq_len=seq_len,
                    min_index=val_min,
                    max_index=val_max)

test_gen = generator(sequence_data,
                     seq_len=seq_len,
                     min_index=test_min,
                     max_index=test_max)

steps_per_epoch = train_max // batch_size

print('Batch size:', batch_size)
print('Steps per epoch:', steps_per_epoch)

print('\nSplit for validation & test @ {0:.2f}%'.format(data_split * 100))
print('Training set:', (0, train_max))
print('Validation set:', (val_min, val_max))
print('Test set:', (test_min, test_max))

In [None]:
n_classes = np.max(sequence_data) + 1

model = Sequential()
model.add(layers.Embedding(input_dim=n_classes,
                           output_dim=num_units,
                           input_length=seq_len))
for n in range(num_layers - 1):
    model.add(layers.LSTM(num_units, return_sequences=True))
    if dropout > 0.0:
        model.add(layers.Dropout(dropout))
model.add(layers.LSTM(num_units))
if dropout > 0.0:
    model.add(layers.Dropout(dropout))
model.add(layers.Dense(n_classes, activation='softmax'))

model.compile(loss='sparse_categorical_crossentropy',
              optimizer='adam',
              metrics=['acc'])
model.summary()

In [None]:
history = model.fit_generator(train_gen,
                              steps_per_epoch=steps_per_epoch,
                              epochs=epochs,
                              validation_data=val_gen,
                              validation_steps=validation_steps)

In [None]:
# Plot validation and training loss
loss = history.history['loss']
val_loss = history.history['val_loss']
acc = history.history['acc']
val_acc = history.history['val_acc']
epochs = range(1, len(loss) + 1)

plt.figure()
plt.plot(epochs, loss, 'r', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
plt.show()

plt.figure()
plt.plot(epochs, acc, 'g', label='Training accuracy')
plt.plot(epochs, val_acc, 'b', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.legend()
plt.show()

In [None]:
test_temperature = 1.0

# ~~~~~~

def reweight_distribution(original_distribution, temperature):
    distribution = np.log(original_distribution) / temperature
    distribution = np.exp(distribution)
    return distribution / np.sum(distribution)

positive_results = 0
total_results = 0

for test_start in range(len(sequence_data) - seq_len):
    slice_from = test_start
    slice_to = test_start + seq_len

    test_sequence = sequence_data[slice_from:slice_to]
    test_target_class = sequence_data[slice_to:slice_to + 1][0]

    test_result = model.predict(np.array([test_sequence]))

    test_result_reweighted = reweight_distribution(test_result,
                                                   test_temperature)
    test_result_class = np.argmax(test_result_reweighted)
    
    if test_result_class == test_target_class:
        positive_results += 1
        
    total_results += 1

ratio = positive_results / total_results
print('Finished test with {0} positives out of {1} (score={2:.0f}%)'.format(positive_results,
                                                                            total_results,
                                                                            ratio * 100))

In [None]:
model.save(model_path)