In [None]:
#Import data with duplicates removed
#group exercises with the same input vector
#partition for validation
#save for use in other models for consistency and comparability

import pickle

with open('/filepath/lhs_dataset_old_assumptions_prepped_rmdup.txt', 'rb') as read:
  rm_dup = pickle.load(read)

#Unzip data into X and Y
def unzip(dataset):
  dataset_X = []
  dataset_Y = []
  for exercise in dataset:
    temp_X, temp_Y = list(zip(*exercise))
    dataset_X.append(temp_X)
    dataset_Y.append(temp_Y)
  return dataset_X, dataset_Y

lhs_dataset_X, lhs_dataset_Y = unzip(rm_dup)

#Manual validation split - keep exercises with the same input X together
index_tracker = []
same_X = {}

for i in range(len(lhs_dataset_X)):
  if i in index_tracker:
    continue
  else:
    index_tracker.append(i)
    same_X[i] = [i]
    for j in range(i+1,len(lhs_dataset_X)):
      if lhs_dataset_X[i] == lhs_dataset_X[j]:
        index_tracker.append(j)
        same_X[i].append(j)

print(len(index_tracker))
print(len(same_X)) #dictionary of exercise indexes with unique X values (the first of each duplicate is taken as key, then the same exercise and any duplicates are value)

#Randomly select 20% of exercises for validation
import random

val_idx = random.sample(same_X.keys(), int(0.2*len(same_X.keys())))
train_idx = []
for i in same_X.keys():
  if i in val_idx:
    continue
  else:
    train_idx.append(i)

print(len(val_idx), len(train_idx))

train_set_X = []
train_set_Y = []
val_set_X = []
val_set_Y = []

for i in same_X.keys():
  for j in same_X[i]:
    if i in train_idx:
      train_set_X.append(lhs_dataset_X[j])
      train_set_Y.append(lhs_dataset_Y[j])
    elif i in val_idx:
      val_set_X.append(lhs_dataset_X[j])
      val_set_Y.append(lhs_dataset_Y[j])
    else:
      print("Error - train/validation exercises not properly allocated")
      break

#Save exercises for use in other models for consistency
with open('/filepath/lhs_old_assumptions_trainX.txt', 'wb') as f:
  pickle.dump(train_set_X, f)
with open('/filepath/lhs_old_assumptions_trainY.txt', 'wb') as f:
  pickle.dump(train_set_Y, f)
with open('/filepath/lhs_old_assumptions_valX.txt', 'wb') as f:
  pickle.dump(val_set_X, f)
with open('/filepath/lhs_old_assumptions_valY.txt', 'wb') as f:
  pickle.dump(val_set_Y, f)

In [None]:
#Load data
import pickle

with open('/filepath/lhs_old_assumptions_trainX.txt', 'rb') as f:
  train_set_X = pickle.load(f)
with open('/filepath/lhs_old_assumptions_trainY.txt', 'rb') as f:
  train_set_Y = pickle.load(f)
with open('/filepath/lhs_old_assumptions_valX.txt', 'rb') as f:
  val_set_X = pickle.load(f)
with open('/filepath/lhs_old_assumptions_valY.txt', 'rb') as f:
  val_set_Y = pickle.load(f)

#Pad data and prepare for training
import numpy as np

from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import TimeDistributed
from keras.layers import LSTM
from keras.layers import Bidirectional
from keras.layers import Masking
from keras import losses
from keras import optimizers

from keras.callbacks import EarlyStopping

train_pad_X = pad_sequences(train_set_X, value=-99, maxlen=128)
train_pad_Y = pad_sequences(train_set_Y, value=-99, maxlen=128)
val_pad_X = pad_sequences(val_set_X, value=-99, maxlen=128)
val_pad_Y = pad_sequences(val_set_Y, value=-99, maxlen=128)

train_X_samples, timesteps, features = train_pad_X.shape
# print(lhs_X_padded.shape)
#shape should represent: (num samples, sample length = maxlength, 2 for X and Y)

es = EarlyStopping(monitor='val_loss', patience = 5, verbose = 1)

model = Sequential()
model.add(Masking(mask_value = -99, input_shape=(timesteps, features)))
model.add(Bidirectional(LSTM(5, return_sequences=True)))
model.add(Dense(4, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
#Train model
history = model.fit(x=train_pad_X, y=train_pad_Y, validation_data=(val_pad_X, val_pad_Y), batch_size = 5, epochs = 200, verbose = 1, shuffle=True, callbacks = [es])

with open('/filepath/LSTM_PVE_S005_history.txt', 'wb') as f:
  pickle.dump(history.history, f)
model.save("/filepath/LSTM_PVE_S005")

In [None]:
##Verify learning curves
import matplotlib.pyplot as plt

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
#Prepare data for evaluation
from keras.models import load_model
from keras.preprocessing.sequence import pad_sequences
import pickle

model = load_model("/filepath/LSTM_PVE_S005")

with open('/filepath/lhs_old_assumptions_valX.txt', 'rb') as f:
  val_set_X = pickle.load(f)
with open('/filepath/lhs_old_assumptions_valY.txt', 'rb') as f:
  val_set_Y = pickle.load(f)

#Get length of each exercise
val_lens = [len(ex) for ex in val_set_X]

#Pad each exercise to max
val_pad_X = pad_sequences(val_set_X, value=-99, maxlen=128)
val_pad_Y = pad_sequences(val_set_Y, value=-99, maxlen=128)

#Predict classes on the test set
est_output = model.predict_classes(val_pad_X)

#Remove padding in output
masked_output = []
for i in range(len(est_output)):
 masked_output.append([1+note for note in est_output[i][-val_lens[i]:]])

#Convert validation GT's from one-hot to numerical encoding
val_output = []
for ex in val_set_Y:
  val_output.append([note.index(1)+1 for note in ex])

In [None]:
##Sequence Accuracy

#Strict approach
from keras import metrics
m = metrics.Accuracy()
perseq_acc = []

for i in range(len(masked_output)):
  m.reset_state()
  m.update_state(masked_output[i], val_output[i])
  perseq_acc.append(m.result().numpy())

print(sum(perseq_acc)/len(perseq_acc))
print(perseq_acc)

#MGT approach
#Create dictionary. Keys are indices 0-1825 for each exercise. Values are lists of indices that have the same input
idx_map = {}
for idx, ex in enumerate(val_set_X):
  idx_map[idx] = []
  for idx_2, ex_2 in enumerate(val_set_X):
    if ex == ex_2:
      idx_map[idx].append(idx_2)

#when evaluating each exercise, we want to compute the accuracy against all possible solutions, then pick the highest one as the GT.
mGT_accuracy_list = []
temp_acc = metrics.Accuracy()

for i in range(len(masked_output)):
  mGT_idx = idx_map[i]
  temp_acc_list = []
  if len(mGT_idx) == 1:
    temp_acc.reset_state()
    temp_acc.update_state(masked_output[i], val_output[i])
    mGT_accuracy_list.append(temp_acc.result().numpy())
    continue
  else:
    for idx in mGT_idx:
      temp_acc.reset_state()
      temp_acc.update_state(masked_output[i], val_output[idx])
      temp_acc_list.append(temp_acc.result().numpy())
    mGT_accuracy_list.append(max(temp_acc_list))

print(sum(mGT_accuracy_list)/len(mGT_accuracy_list))
print(mGT_accuracy_list)


In [None]:
##Note Accuracy:

#Strict approach
from keras import metrics
m = metrics.Accuracy()

for i in range(len(masked_output)):
  m.update_state(masked_output[i], val_output[i])
print(m.result().numpy())

#MGT approach
#Create dictionary. Keys are indices 0-1825 for each exercise. Values are lists of indices that have the same input
idx_map = {}
for idx, ex in enumerate(val_set_X):
  idx_map[idx] = []
  for idx_2, ex_2 in enumerate(val_set_X):
    if ex == ex_2:
      idx_map[idx].append(idx_2)

mGT_accuracy_list = []
temp_acc = metrics.Accuracy()

for i in range(len(masked_output)):
  ex_length = len(masked_output[i])
  mGT_idx = idx_map[i]
  temp_acc_list = []
  if len(mGT_idx) == 1:
    temp_acc.reset_state()
    temp_acc.update_state(masked_output[i], val_output[i])
    for j in range(ex_length):
      mGT_accuracy_list.append(temp_acc.result().numpy())
    continue
  else:
    for idx in mGT_idx:
      temp_acc.reset_state()
      temp_acc.update_state(masked_output[i], val_output[idx])
      for j in range(ex_length):
        temp_acc_list.append(temp_acc.result().numpy())
    mGT_accuracy_list.append(max(temp_acc_list))

print(sum(mGT_accuracy_list)/len(mGT_accuracy_list))