# Model Comparison: Lightpad

In this notebook, we will compare multiple models for predicting ROLI Lightpad block data.

First, let's have a look at the data.

In [None]:
import keras
from keras import backend as K
from keras.layers import Dense, Input
import numpy as np
import tensorflow as tf
import math
import h5py
import random
import time
import pandas as pd
from context import * # imports MDN
import matplotlib.pyplot as plt
%matplotlib inline

input_colour = 'darkblue'
gen_colour = 'firebrick'
plt.style.use('seaborn-talk')

In [None]:
# Load and process raw OSC saved data
data = pd.read_csv("../datasets/2018-06-21-block-perf1.txt")
data['dt'] = data.time.diff().fillna(0)
data['x'] = data.message.apply((lambda x: int(x.split(' ')[2]))) / 127.0
data['y'] = data.message.apply((lambda x: int(x.split(' ')[3]))) / 127.0
data['z'] = data.message.apply((lambda x: int(x.split(' ')[4]))) / 127.0
# Save as numpy array
cleaned_data = data[['x', 'y', 'z', 'dt']]
array_version = np.array(cleaned_data)
np.savez('roli-block-session-data.npz', array_version)

# Have a look at some details:
print("Here's an excerpt of data:")
print(cleaned_data[100:110])
cleaned_data.describe()

In [None]:
plt.boxplot(cleaned_data.dt)
#plt.yscale("log")
plt.ylim(0.02,0.04)
plt.show()

In [None]:
# Test loading
with np.load('roli-block-session-data.npz') as data:
    dataset = data['arr_0']
print("Here's the data shape:")
print(dataset.shape)
print("Corpus data points between 100 and 120:")
print(dataset[100:120])
print("Some statistics about the dataset:")
pd.DataFrame(dataset).describe()

In [None]:
def perf_df_to_array(perf_df, xylim=1.0, dtlim=5.0):
    """Converts a dataframe of a performance into array a,b,dt format."""
    perf_df['dt'] = perf_df.time.diff()
    perf_df.dt = perf_df.dt.fillna(0.0)
    perf_df.set_value(perf_df[perf_df.dt > 5].index, 'dt', dtlim)
    perf_df.set_value(perf_df[perf_df.dt < 0].index, 'dt', 0.0)
    perf_df.set_value(perf_df[perf_df.x > 1].index, 'x', xylim)
    perf_df.set_value(perf_df[perf_df.x < 0].index, 'x', 0.0)
    perf_df.set_value(perf_df[perf_df.y > 1].index, 'y', xylim)
    perf_df.set_value(perf_df[perf_df.y < 0].index, 'y', 0.0)
    return np.array(perf_df[['x', 'y', 'dt']])

def perf_array_to_df(perf_array):
    """Converts an array of a performance (a,b,dt format) into a dataframe."""
    perf_array = perf_array.T
    perf_df = pd.DataFrame({'x': perf_array[0], 'y': perf_array[1], 'z': perf_array[2], 'dt': perf_array[3]})
    perf_df['time'] = perf_df.dt.cumsum()
    # As a rule of thumb, could classify taps with dt>0.1 as taps, dt<0.1 as moving touches.
    perf_df['moving'] = 1
    perf_df.set_value(perf_df[perf_df.dt > 0.1].index, 'moving', 0)
    perf_df = perf_df.set_index(['time'])
    return perf_df[['x', 'y', 'z', 'moving']]


def random_touch():
    """Generate a random tiny performance touch."""
    return np.array([np.random.rand(), np.random.rand(), 0.01])


def constrain_touch(touch):
    """Constrain touch values from the MDRNN"""
    touch[0] = min(max(touch[0], 0.0), 1.0)  # x in [0,1]
    touch[1] = min(max(touch[1], 0.0), 1.0)  # y in [0,1]
    touch[2] = max(touch[2], 0.001)  # dt # define minimum time step
    return touch

def generate_random_tiny_performance(model, n_mixtures, first_touch, time_limit=5.0, steps_limit=1000, temp=1.0):
    """Generates a tiny performance up to 5 seconds in length."""
    time = 0
    steps = 0
    previous_touch = first_touch
    performance = [previous_touch.reshape((3,))]
    while (steps < steps_limit and time < time_limit):
        params = model.predict(previous_touch.reshape(1,1,3))
        previous_touch = mdn.sample_from_output(params[0], 3, n_mixtures, temp=temp)
        output_touch = previous_touch.reshape(3,)
        output_touch = constrain_touch(output_touch)
        performance.append(output_touch.reshape((3,)))
        steps += 1
        time += output_touch[2]
    return np.array(performance)


def condition_and_generate(model, perf, n_mixtures, time_limit=5.0, steps_limit=1000, temp=1.0):
    """Conditions the network on an existing tiny performance, then generates a new one."""
    time = 0
    steps = 0
    # condition
    for touch in perf:
        params = model.predict(touch.reshape(1,1,3))
        previous_touch = mdn.sample_from_output(params[0], 3, n_mixtures, temp=temp)
    output = [previous_touch.reshape((3,))]
    while (steps < steps_limit and time < time_limit):
        params = model.predict(previous_touch.reshape(1,1,3))
        previous_touch = mdn.sample_from_output(params[0], 3, n_mixtures, temp=temp)
        output_touch = previous_touch.reshape(3,)
        output_touch = constrain_touch(output_touch)
        output.append(output_touch.reshape((3,)))
        steps += 1
        time += output_touch[2]
    net_output = np.array(output)
    return net_output

def divide_performance_into_swipes(perf_df):
    """Divides a performance into a sequence of swipe dataframes for plotting."""
    touch_starts = perf_df[perf_df.moving == 0].index
    performance_swipes = []
    remainder = perf_df
    for att in touch_starts:
        swipe = remainder.iloc[remainder.index < att]
        performance_swipes.append(swipe)
        remainder = remainder.iloc[remainder.index >= att]
    performance_swipes.append(remainder)
    return performance_swipes

def plot_2D(perf_df, name="foo", saving=False):
    """Plot a 2D representation of a performance 2D"""
    swipes = divide_performance_into_swipes(perf_df)
    plt.figure(figsize=(8, 8))
    for swipe in swipes:
        p = plt.plot(swipe.x, swipe.y, 'o-')
        plt.setp(p, color=gen_colour, linewidth=5.0)
    plt.ylim(1.0,0)
    plt.xlim(0,1.0)
    plt.xticks([])
    plt.yticks([])
    if saving:
        plt.savefig(name+".png", bbox_inches='tight')
        plt.close()
    else:
        plt.show()
        
def plot_double_2d(perf1, perf2, name="foo", saving=False):
    """Plot two performances in 2D"""
    plt.figure(figsize=(8, 8))
    swipes = divide_performance_into_swipes(perf1)
    for swipe in swipes:
        p = plt.plot(swipe.x, swipe.y, 'o-')
        plt.setp(p, color=input_colour, linewidth=5.0)
    swipes = divide_performance_into_swipes(perf2)
    for swipe in swipes:
        p = plt.plot(swipe.x, swipe.y, 'o-')
        plt.setp(p, color=gen_colour, linewidth=5.0)
    plt.ylim(1.0,0)
    plt.xlim(0,1.0)
    plt.xticks([])
    plt.yticks([])
    if saving:
        plt.savefig(name+".png", bbox_inches='tight')
        plt.close()
    else:
        plt.show()

In [None]:
### Let's have a look at some data:
plot_2D(perf_array_to_df(dataset[1500:1530]))
# perf_array_to_df(dataset[1500:1700])

plot_2D(perf_array_to_df(dataset[2000:2050]))

## Hyperparameters for experiments



In [None]:
# Training Hyperparameters:
SEQ_LEN = 30
BATCH_SIZE = 256
HIDDEN_UNITS = 128
EPOCHS = 30
VAL_SPLIT=0.2
SEED = 2345  # 2345 seems to be good.
random.seed(SEED)
np.random.seed(SEED)

## Prepare data examples for training



In [None]:
def slice_sequence_examples(sequence, num_steps):
    xs = []
    for i in range(len(sequence) - num_steps - 1):
        example = sequence[i: i + num_steps]
        xs.append(example)
    print("Total training examples:", str(len(xs)))
    return xs

def seq_to_singleton_format(examples):
    """
    Return the examples in seq to singleton format.
    """
    xs = []
    ys = []
    for ex in examples:
        xs.append(ex[:-1])
        ys.append(ex[-1])
    return (xs,ys)


X, y = seq_to_singleton_format(slice_sequence_examples(dataset, SEQ_LEN+1))
X = np.array(X)
y = np.array(y)

print("X:", X.shape)
print("y:", y.shape)

print("Here's an example X")
plot_2D(perf_array_to_df(X[np.random.randint(len(X))]))

## Deterministic RNN

Now we'll define a deterministic RNN for this dataset:

In [None]:
encoder = keras.Sequential()
encoder.add(keras.layers.LSTM(HIDDEN_UNITS, batch_input_shape=(None,SEQ_LEN,4), return_sequences=True))
encoder.add(keras.layers.LSTM(HIDDEN_UNITS))
encoder.add(keras.layers.Dense(4, activation='relu'))
encoder.compile(loss='mse', optimizer=keras.optimizers.Adam())
encoder.summary()

In [None]:
history = encoder.fit(X, y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=VAL_SPLIT)

# Save the Model
encoder.save('lightpad-deterministic-rnn.h5')  # creates a HDF5 file of the model

# Plot the loss
%matplotlib inline
plt.figure(figsize=(10, 5))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

In [None]:
# Plot the loss
%matplotlib inline
plt.figure(figsize=(10, 5))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

### Test out the Deterministic Model

In [None]:
# Deterministic Decoding Model
decoder = keras.Sequential()
decoder.add(keras.layers.LSTM(HIDDEN_UNITS, batch_input_shape=(1,1,4), return_sequences=True, stateful=True))
decoder.add(keras.layers.LSTM(HIDDEN_UNITS, stateful=True))
decoder.add(keras.layers.Dense(4, activation='relu'))
decoder.compile(loss='mse', optimizer=keras.optimizers.Adam())
decoder.summary()

# decoder.set_weights(model.get_weights())
decoder.load_weights("lightpad-deterministic-rnn.h5")

In [None]:
# Sample a performance

encoder.reset_states()

n = np.array([np.random.rand(), np.random.rand(), np.random.rand(), 0.02])
n = np.array([0.5, 0.5, 0.5, 0.02])

#print(next)
points = []
points.append(n)

for i in range(100):
    n = decoder.predict(np.array([[next]]))[0]
    points.append(n)

output = np.array(points)

plot_2D(perf_array_to_df(output))