In [1]:
import csv
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras.utils import np_utils, plot_model
from keras.models import Model
from keras.layers import Input, LSTM, Dense, RepeatVector, TimeDistributed
from sklearn.model_selection import train_test_split

In [2]:
#load data
stream_list = []

for directory, subdirectories, files in os.walk('data'):
    for file in files:
        with open('data/' + file, 'r') as f: 
            reader = csv.reader(f)
            sub_list = [list(map(float,rec)) for rec in csv.reader(f, delimiter=',')]
            stream_list = stream_list + sub_list

In [None]:
!ls /datasets/data

In [3]:
#divide data in train and validation sets
train_X = stream_list[:-1000]
valid_X = stream_list[-1000:]

In [4]:
#create seperate data structures for each variable (offset, quarterlength, pitch)
#normalise offset and quarterlength
offs = []
qlngth = []
ptch = []

offs_v = []
qlngth_v = []
ptch_v = []

offsb = max(element[0] for element in stream_list)
qlngthb = max(element[1] for element in stream_list)
#ptchb = 127.0

for row in train_X:
    offs.append(row[0]/offsb)
    qlngth.append(row[1]/qlngthb)
    ptch.append(row[2])

for row in valid_X:
    offs_v.append(row[0]/offsb)
    qlngth_v.append(row[1]/qlngthb)
    ptch_v.append(row[2])

In [5]:
#divide the sets in sequences of specific length 
dtlngth=np.shape(train_X)[0]
dtlngth_v=np.shape(valid_X)[0]
seq_length = 30#100 groups of 3

dataX_o = []
dataX_q = []
dataX_p = []

dataX_o_v = []
dataX_q_v = []
dataX_p_v = []

for i in range(0, dtlngth - seq_length, 1):
	seq_in_o = offs[i:i + seq_length]
	seq_in_q = qlngth[i:i + seq_length]
	seq_in_p = ptch[i:i + seq_length]

	dataX_o.append([offs for offs in seq_in_o])
	dataX_q.append([qlngth for qlngth in seq_in_q])
	dataX_p.append([ptch for ptch in seq_in_p])

n_patterns = len(dataX_o)
print ("Total Patterns: ", n_patterns)

for i in range(0, dtlngth_v - seq_length, 1):
	seq_in_o_v = offs_v[i:i + seq_length]
	seq_in_q_v = qlngth_v[i:i + seq_length]
	seq_in_p_v = ptch_v[i:i + seq_length]	

	dataX_o_v.append([offs for offs in seq_in_o_v])
	dataX_q_v.append([qlngth for qlngth in seq_in_q_v])
	dataX_p_v.append([ptch for ptch in seq_in_p_v])

n_patterns_v = len(dataX_o_v)
print ("Total Val Patterns: ", n_patterns_v)

Total Patterns:  10954
Total Val Patterns:  970


In [6]:
#reshape inputs to be [samples, time steps, features], one hot encode pitch output
dataX_o = np.reshape(dataX_o, (n_patterns, seq_length, 1))
dataX_q = np.reshape(dataX_q, (n_patterns, seq_length, 1))
dataX_p = np.reshape(dataX_p, (n_patterns, seq_length, 1))
dataY_p = np_utils.to_categorical(dataX_p)

dataX_o_v = np.reshape(dataX_o_v, (n_patterns_v, seq_length, 1))
dataX_q_v = np.reshape(dataX_q_v, (n_patterns_v, seq_length, 1))
dataX_p_v = np.reshape(dataX_p_v, (n_patterns_v, seq_length, 1))
dataY_p_v = np_utils.to_categorical(dataX_p_v)

In [7]:
# define encoder
inputs_o = Input(shape=(seq_length, 1), name="in_o")
inputs_q = Input(shape=(seq_length, 1), name="in_q")
inputs_p = Input(shape=(seq_length, 1), name="in_p")
#input = concat(ino, inq, inp)
lstm_en = LSTM(100, activation='relu')

en_o = lstm_en(inputs_o)
en_q = lstm_en(inputs_q)
en_p = lstm_en(inputs_p)

# define reconstruct decoder
RV = RepeatVector(seq_length)

de_o = RV(en_o)
de_q = RV(en_q)
de_p = RV(en_p)

lstm_de = LSTM(100, activation='relu', return_sequences=True)

de_o = lstm_de(de_o)
de_q = lstm_de(de_q)
de_p = lstm_de(de_p)

de_o = TimeDistributed(Dense(1, activation='sigmoid'), name="out_o")(de_o)#act relu
de_q = TimeDistributed(Dense(1, activation='sigmoid'), name="out_q")(de_q)
de_p = TimeDistributed(Dense(dataY_p.shape[2], activation='softmax'), name="out_p")(de_p)

model = Model(
    inputs= [inputs_o, inputs_q, inputs_p],
    outputs=[de_o, de_q, de_p],
)

In [None]:
model.compile(loss='categorical_crossentropy', optimizer='adam',
    metrics=["mean_squared_error"],
)

In [None]:
model.fit({"in_o": dataX_o, "in_q": dataX_q, "in_p": dataX_p}, {"out_o": dataX_o, "out_q": dataX_q, "out_p": dataY_p}, batch_size=64, epochs=2)

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7f99afb86290>

In [None]:
test_scores = model.evaluate({"in_o": dataX_o_v, "in_q": dataX_q_v, "in_p": dataX_p_v}, {"out_o": dataX_o_v, "out_q": dataX_q_v, "out_p": dataY_p_v}, verbose=2)
print("Test loss:", test_scores[0])
print("Test accuracy:", test_scores[1])

31/31 - 1s - loss: 14.1507 - out_o_loss: 1.0060e-07 - out_q_loss: 6.2046e-09 - out_p_loss: 14.1507 - out_o_mean_squared_error: 0.1178 - out_q_mean_squared_error: 0.2050 - out_p_mean_squared_error: 0.0207
Test loss: 14.150694847106934
Test accuracy: 1.0059886790259043e-07


In [None]:
yhat = model.predict({"in_o": np.reshape(dataX_o_v[1], (1, 30, 1)), "in_q": np.reshape(dataX_q_v[1], (1, 30, 1)), "in_p": np.reshape(dataX_p_v[1], (1, 30, 1))}, verbose=0)