# First learning attempts
### using Beethoven dataset
* 29 pieces + transpositions across 2 octaves
* ~70h of music (2.7h per transposition)
* 0.025s resolution (40fps)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
IGNORE_NOTE_VELOCITY = True
TARGET_IS_SEQUENCE = True
MAX_WINDOW_SIZE = 100

# loading data files names
import os

path = '/content/drive/My Drive/datasets/beethoven100ms/transposed/'
file_names = os.listdir(path)
file_names = list(filter(lambda fn: '.npz' in fn or '.npy' in fn or '.csv' in fn, file_names))
assert len(file_names) > 0, 'Data not found'

f'Found {len(file_names)} files'

In [None]:
# loading data files
def read_numpy_midi(input_path):
    import numpy as np
    from scipy import sparse
    if 'csv' in input_path:
        return np.loadtxt(input_path, delimiter=",", dtype=np.int32)
    elif 'npy' in input_path:
        return np.load(input_path).astype(np.float32)
    elif 'npz' in input_path:
        sparse_numpy = sparse.load_npz(input_path)
        return sparse_numpy.toarray().astype(np.float32)

file_paths = [f'{path}{fn}' for fn in file_names]

from random import choice
def load_tracks(n):
    print('loading tracks')
    sampled_file_paths = [choice(file_paths) for _ in range(n)]
    tracks = [read_numpy_midi(fp) for fp in sampled_file_paths]
    if IGNORE_NOTE_VELOCITY:
        tracks = [t[:, :128] for t in tracks]
    return tracks

In [None]:
# select batch
def create_batch(data, batch_size, seq_length):
    # each sequence is from diffrent track
    tracks_indices = np.random.randint(0, len(data), batch_size)
    # select sequences from selected tracks
    seq_indicies = [np.random.randint(0, len(data[ti]) - seq_length - 1) 
                    for ti in tracks_indices]
    # transform indices to slices
    x_slice = lambda si: np.s_[si:si + seq_length]
    y_slice = lambda si: (
        np.s_[si + 1:si + seq_length + 1] 
        if TARGET_IS_SEQUENCE 
        else np.s_[si + seq_length]
    )
    x = np.stack([data[ti][x_slice(si)] for ti,si in zip(tracks_indices, seq_indicies)])
    y = np.stack([data[ti][y_slice(si)] for ti,si in zip(tracks_indices, seq_indicies)])
    return x, y

In [None]:
# dataset generator
import numpy as np
def data_gen(batch_size, seq_len, track_count=25):
    # x data shape should be [batch_size, sequence_len, input_dim]
    # y shape is [batch_size, input_dim]  
    while True:
        print('reloading data')
        data = load_tracks(track_count)  
        print(f'reloaded data')
        for _ in range(1000 * len(data)):
            seq = np.random.randint(seq_len[0], seq_len[1])
            yield create_batch(data, batch_size, seq)

## Setting up model

In [None]:
from tensorflow import keras as K

INPUT_SIZE = 128 if IGNORE_NOTE_VELOCITY else 256
HIDDEN_SIZE = 512
OUTPUT_SIZE = INPUT_SIZE

BATCH_SIZE = 16

INPUT_SHAPE = (None, INPUT_SIZE)
# None means that sequence length is not strictly defined
# 3 dim - batch size, is defined implicitly

In [91]:
def PenalizeFalseNegatives(penalty_factor, lossF):
    def inner(y_true, y_pred):
        res = lossF(y_true, y_pred)
        # boolean tensor indicating false negatives 
        # if y_true == 1 and y_pred == 0
        FN = K.backend.clip(y_true - K.backend.round(y_pred), 0, 1)
        # count false negatives
        FN = K.backend.sum(FN)
        penalty = K.backend.pow(1. + penalty_factor, FN)
        return res * penalty
    return inner

In [94]:
y_true = np.array([1,0,1])
y_pred = np.array([0,0,0])
l = PenalizeFalseNegatives(0.1, K.losses.BinaryCrossentropy())
r = l(K.backend.variable(y_true), K.backend.variable(y_pred))
K.backend.eval(r)

13.00193

In [None]:
model = K.models.Sequential([
    K.layers.LSTM(
        HIDDEN_SIZE, 
        input_shape=INPUT_SHAPE, 
        return_sequences=TARGET_IS_SEQUENCE
    ),
    K.layers.Dense(OUTPUT_SIZE, activation='sigmoid')
])

model.compile(
#     loss=PenalizeFalseNegatives(0.1, K.losses.BinaryCrossentropy()), 
    loss='binary_crossentropy',
    optimizer=K.optimizers.Adam(), 
    metrics=[
        K.metrics.BinaryAccuracy(), 
        K.metrics.FalsePositives(), 
        K.metrics.FalseNegatives()
    ]
)

In [None]:
# or load saved model
base_path = '/content/drive/My Drive/datasets/'
file_name = 'heck_512_all/m1559929134.h5'
model = K.models.load_model(base_path + file_name)

In [None]:
# pre running operations
# some stat data accumultors for re-running model
from time import time
epochs_elapsed = 0
minutes_elapsed = 0
gen = data_gen(BATCH_SIZE, seq_len=(20, MAX_WINDOW_SIZE), track_count=100)
test_gen = data_gen(BATCH_SIZE, seq_len=(20, MAX_WINDOW_SIZE), track_count=10)

### Running model

In [None]:
EPOCHS = 1
STEPS_PER_EPOCH = 1000
start_time = time()

model.fit_generator(
    gen, 
    steps_per_epoch=STEPS_PER_EPOCH, 
    epochs=EPOCHS, 
    validation_data=test_gen, 
    validation_steps=100,
    callbacks=[CustomCallback(600, 16)]
)

minutes_elapsed += (time() - start_time) // 60
epochs_elapsed += EPOCHS

### testing/plotting

In [None]:
def generate(seq_length, batch_size=1, window_len=MAX_WINDOW_SIZE):
    # sequence shape is [batch_size, sequence_length, input_size]
#     seed = np.zeros((batch_size, 1, INPUT_SIZE)) 
    seed = np.random.random((batch_size, MAX_WINDOW_SIZE, INPUT_SIZE)) * 0.5
    x = seed
    accum = [seed]
    for i in range(seq_length):
        print(i, end=',')
        res = model.predict(x).round()
        if TARGET_IS_SEQUENCE:
            # next input consists of first old input first frame
            # and whole result sequence
            # (then limited to window size)
            x = np.concatenate([x, res[:, -1:, :]], axis=1)[:, -MAX_WINDOW_SIZE:, :]
            accum.append(res[:, -1:, :])
        else:
            # next input consists of previous input
            # with result frame attatched to end
            # (then limited to window size)
            x = np.concatenate([x, res[:, np.newaxis, :]], axis=1)[:, -MAX_WINDOW_SIZE:, :]
            accum.append(res[:, np.newaxis, :])
    return np.concatenate(accum, axis=1).round()

In [None]:
import matplotlib.pyplot as plt
def generate_plot(length=300, batch_size=16):
    x = generate(length, batch_size)
    fig, axs = plt.subplots(nrows=4, ncols=4, figsize=(30, 10),
                            subplot_kw={'xticks': [], 'yticks': []})
    for ax, x_ in zip(axs.flat, x):
        ax.imshow(x_.T[::-1, :])
    plt.tight_layout()
    res = plt.gcf()
    plt.show()
    return res

In [None]:
# wrapping it up in callback
import json
from time import time

def default(val):
    if isinstance(val, np.float32):
        return float(val)
    raise TypeError

class CustomCallback(K.callbacks.Callback):
    def __init__(self, plot_length, batch_size, file_path=''):
        super().__init__()
        self.plot_length = plot_length
        self.batch_size = batch_size
        self.output = file_path != ''
        self.file_path = file_path
        self.log = ''
        
    def on_epoch_end(self, epoch, logs={}):
        plot = generate_plot(self.plot_length, self.batch_size)
        if self.output:
            t = int(time())
            plot.savefig(self.file_path + f'{t}.png')
            K.models.save_model(model, self.file_path + f'm{t}.h5')
            with open(self.file_path + 'log.txt', 'a+') as fo:
                fo.write(self.log)
                self.log = ''
        self.log += json.dumps(logs, default=default) + '\n'
        return
      
    def on_train_end(self, logs={}):
        if not self.output:
            return
        return

### Saving model

In [None]:
base_path = '/content/drive/My Drive/datasets/'
keywords = '_'.join(['beth', 'notransp', 'randchunk'])
file_name = f'{keywords}_{HIDDEN_SIZE}_{epochs_elapsed}epochs_{minutes_elapsed}m.h5'

K.models.save_model(model, base_path + file_name)