In [None]:
import numpy as np
import tensorflow as tf
import sklearn.metrics
import h5py
import random
import math

In [None]:
DEAM_ARCHIVE_PATH = '/Users/canchel/Desktop/normalized_deam_samples.hdf5'

TEST_SPLIT = 0.1
VALIDATION_SPLIT = 0.1

In [None]:
with h5py.File(DEAM_ARCHIVE_PATH) as in_file:
    song_ids = sorted(map(int, in_file.keys()))
    feature_dict = {song_id : np.array(in_file[str(song_id) + '/features']) for song_id in song_ids}
    valence_dict = {song_id : np.array(in_file[str(song_id) + '/valence']) for song_id in song_ids}
    arousal_dict = {song_id : np.array(in_file[str(song_id) + '/arousal']) for song_id in song_ids}

random.shuffle(song_ids)
split_1, split_2 = math.ceil((1.0 - VALIDATION_SPLIT - TEST_SPLIT) * len(song_ids)), math.ceil((1 - TEST_SPLIT) * len(song_ids))
training_song_ids, validation_song_ids, test_song_ids = song_ids[:split_1], song_ids[split_1:split_2], song_ids[split_2:]

def get_discrete_samples(song_ids):
    feature_matrix = np.concatenate([feature_dict[song_id] for song_id in song_ids], axis=0)
    label_matrix = np.concatenate([
        np.concatenate([valence_dict[song_id] for song_id in song_ids], axis=0),
        np.concatenate([arousal_dict[song_id] for song_id in song_ids], axis=0)
    ], axis=1)
    return feature_matrix, label_matrix

def get_sequential_samples(song_ids, clip_range=(8, 60), batch_size=4):
    def clip(feature_sequence, label_sequence, length):
        assert len(feature_sequence) == len(label_sequence)
        assert length <= len(feature_sequence)
        start_index = np.random.randint(0, len(feature_sequence) - length + 1)
        return feature_sequence[start_index : start_index + length], label_sequence[start_index : start_index + length]
    dict_indexer = lambda x, y: [x[i] for i in y]
    features, valences, arousals = tuple(map(lambda x: dict_indexer(x, song_ids), [feature_dict, valence_dict, arousal_dict]))
    candidate_pool = [*zip(features, valences, arousals)]
    while True:
        clip_length = np.random.randint(*clip_range)
        selected_samples = random.choices(candidate_pool, k=batch_size)
        clips = [clip(feature_seq, np.concatenate([valence_seq, arousal_seq], axis=-1), clip_length) for feature_seq, valence_seq, arousal_seq in selected_samples]
        yield tuple(map(np.array, [*zip(*clips)]))


def evaluate(song_ids, evaluators, evaluator_names=None):
    assert evaluator_names is None or len(evaluators) == len(evaluator_names)
    metric_calculator_dict = {
        'mse' : sklearn.metrics.mean_squared_error,
        'r2' : sklearn.metrics.r2_score
    }
    labels = []
    predictions = [[] for _ in range(len(evaluators))]
    for step_index in range(len(song_ids)):
        feature, label = [*map(lambda x: x[0], next(get_sequential_samples(song_ids, clip_range=(8, 9), batch_size=1)))]
        labels += list(label)
        for evaluator_index in range(len(evaluators)):
            predictions[evaluator_index] += list(evaluators[evaluator_index](feature))
    labels, predictions = np.array(labels), [np.array(p) for p in predictions]
    for metric_name in metric_calculator_dict.keys():
        print(metric_name)
        for evaluator_index in range(len(evaluators)):
            print(evaluator_names[evaluator_index] if evaluator_names is not None else 'evaluator ' + str(evaluator_index), end=': ')
            for label_index in range(labels.shape[-1]):
                print(metric_calculator_dict[metric_name](labels[:, label_index], predictions[evaluator_index][:, label_index]), end=' ')
            print('')

In [None]:
mlp_model_input = tf.keras.layers.Input(shape=(130,))
layer = tf.keras.layers.Dense(
    units=32,
    activation='relu',
    kernel_regularizer=tf.keras.regularizers.l2(1e-4)
)(mlp_model_input)
for _ in range(4):
    layer = tf.keras.layers.Dense(
        units=32,
        activation='relu',
        kernel_regularizer=tf.keras.regularizers.l2(1e-4)
    )(layer)
layer = tf.keras.layers.Dense(
    units=2,
    activation='tanh',
    kernel_regularizer=tf.keras.regularizers.l2(1e-4)
)(layer)

mlp_model = tf.keras.models.Model(inputs=mlp_model_input, outputs=layer)
mlp_model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.MSE
)

def mlp_evaluator(feature_sequence):
    return mlp_model.predict(feature_sequence)

In [None]:
mlp_model.fit(
    *get_discrete_samples(training_song_ids), 
    epochs=16, 
    batch_size=32, 
    callbacks=[
        tf.keras.callbacks.LearningRateScheduler(lambda ei, lr: lr / 2.0 if ei > 0 and (ei / 2).is_integer() else lr)
    ]
)

In [None]:
seq2seq_model_input = tf.keras.layers.Input(shape=(8, 130))
layer = tf.keras.layers.LSTM(32, return_sequences=False)(seq2seq_model_input)
layer = tf.keras.layers.Dense(32, activation='relu')(layer)
layer = tf.keras.layers.RepeatVector(8)(layer)
layer = tf.keras.layers.LSTM(32, return_sequences=True)(layer)
layer = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(2, activation='tanh'))(layer)
seq2seq_model = tf.keras.models.Model(inputs=seq2seq_model_input, outputs=layer)
seq2seq_model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.MSE
)

def seq2seq_evaluator(feature_sequence):
    return seq2seq_model.predict(np.array([feature_sequence]))[0]

In [None]:
seq2seq_model.fit(
    get_sequential_samples(training_song_ids, clip_range=(8, 9)), 
    epochs=16, 
    steps_per_epoch=len(training_song_ids),
    callbacks=[
        tf.keras.callbacks.LearningRateScheduler(lambda ei, lr: lr / 2.0 if ei > 0 and (ei / 2).is_integer() else lr)
    ]
)

In [None]:
shared_layer_cache = {}
seq2seq_attention_model_input = tf.keras.layers.Input(shape=(8, 130))
encoder_output, encoder_state = tf.keras.layers.GRU(32, return_sequences=True, return_state=True)(seq2seq_attention_model_input)

def attention_decoder_layer():
    def structure(previous_output, encoder_state, encoder_output):
        global shared_layer_cache
        encoder_state = tf.keras.backend.expand_dims(encoder_state, axis=1)
        if 'attention_decoder_layer' not in shared_layer_cache:
            shared_layer_cache['attention_decoder_layer'] = {
                'encoder_state_weight_layer_1' : tf.keras.layers.Dense(32),
                'encoder_output_weight_layer_1' : tf.keras.layers.Dense(32),
                'score_weight_layer_1' : tf.keras.layers.Dense(1),
                'output_gru_layer_1' : tf.keras.layers.GRU(32, return_sequences=True, return_state=True),
                'output_dense_layer_1' : tf.keras.layers.Dense(2, activation='tanh')
            }
        cache = shared_layer_cache['attention_decoder_layer']
        score = cache['encoder_state_weight_layer_1'](encoder_state) + cache['encoder_output_weight_layer_1'](encoder_output)
        score = cache['score_weight_layer_1'](tf.keras.layers.Activation('tanh')(score))
        attention_weights = tf.keras.layers.Softmax(axis=1)(score)
        context_vector = tf.keras.backend.sum(attention_weights * encoder_output, axis=1)
        context_concat = tf.keras.layers.Concatenate(axis=-1)([previous_output, tf.keras.backend.expand_dims(context_vector, axis=1)])
        output, decoder_state = cache['output_gru_layer_1'](context_concat)
        output = cache['output_dense_layer_1'](output)
        return output, decoder_state, attention_weights
    return structure

previous_output = tf.keras.backend.zeros((4, 1, 2))
decoder_state = encoder_state
outputs = []
for _ in range(8):
    previous_output, decoder_state, _ = attention_decoder_layer()(previous_output, decoder_state, encoder_output)
    outputs.append(previous_output)
final_output = tf.keras.layers.Concatenate(axis=1)(outputs)
seq2seq_attention_model = tf.keras.models.Model(inputs=seq2seq_attention_model_input, outputs=final_output)

seq2seq_attention_model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.MSE
)

def seq2seq_attention_evaluator(feature_sequence):
    return seq2seq_attention_model.predict(np.array([feature_sequence] * 4))[0]

In [None]:
seq2seq_attention_model.fit(
    get_sequential_samples(training_song_ids, clip_range=(8, 9)), 
    epochs=16, 
    steps_per_epoch=len(training_song_ids),
    callbacks=[
        tf.keras.callbacks.LearningRateScheduler(lambda ei, lr: lr / 2.0 if ei > 0 and (ei / 2).is_integer() else lr)
    ]
)

In [None]:
evaluate(test_song_ids, [mlp_evaluator, seq2seq_evaluator, seq2seq_attention_evaluator], ['MLP', 'seq2seq', 'seq2seq_attention'])