In [None]:
import sys

# This for managing relative imports from nb
if '..' not in sys.path: sys.path.append('..')
    
import numpy as np
from scipy.io import wavfile

import matplotlib
import matplotlib.pyplot as plt

import re, mido
from mido import MidiFile

from mwriter.musicfactory import MusicFactory
from mwriter.abstractfactory import MLDataSet

import warnings
warnings.filterwarnings("ignore") # Just supress "WavFileWarning: Chunk (non-data) not understood, skipping it."

In [None]:
DEFAULT_BPM = 120
FACTORY = MusicFactory()

def get_note_strike_times(midi_file):
#{
    tempo = mido.bpm2tempo(DEFAULT_BPM)
    
    assert len(midi_file.tracks) == 1, f"MIDI must have 1 track, not: {len(midi_file.tracks)}"
 
    strike_times = []
    accrued_ticks = 0
    for message in midi_file.tracks[0]: 
    #{
        # Non-zero velocity --> note has been struck
        # Zero velocity --> note has been released
    
        accrued_ticks += message.time
        if message.type == 'note_on' and message.velocity > 0:
            strike_times.append(mido.tick2second(accrued_ticks, midi_file.ticks_per_beat, tempo))
    #}
    
    print(f"Found {len(strike_times)} note strikes in MIDI file: {midi_file.filename}")
    
    return strike_times
#}

def generate_contiguous_labels(midi_filename, m, sec_per_step):
#{  
    Y = np.zeros((m,))
    strike_times = get_note_strike_times(midi_filename)
    for strike_sec in strike_times: Y[int(strike_sec/sec_per_step)] = 1

    return Y
#}

def generate_windowed_labels(midi_filename, m, time_batch, sec_per_step, stride=1):
#{
    # Pad an extra time_batch worth of zeros at the end of contiguous
    Y_contiguous = generate_contiguous_labels(midi_filename, m+time_batch, sec_per_step)
    
    strided_steps = np.arange(0, m, stride)
    Y_windowed = np.zeros((len(strided_steps), time_batch))
    
    for i in range(len(strided_steps)): 
        Y_windowed[i,:] = Y_contiguous[strided_steps[i]:strided_steps[i]+time_batch]

    return Y_windowed
#}

In [None]:
# Read wav files (for now discard the 2nd stereo track).
rate_train, data_train = wavfile.read('../data/audio/88_Key_Ascending_Chromatic_Scale.wav')
rate_val, data_val = wavfile.read('../data/audio/88_Key_Descending_Chromatic_Scale.wav')
data_train = data_train[:,0]; data_val = data_val[:,0]       

assert rate_train == rate_val, f"rate_train: {rate_train} must equal rate_val: {rate_val}"

# Init NFFT for the minimum piano semitone:
# b/w A#0 and A0 (29.135 - 27.5 = 1.635Hz)
NFFT = int(rate_train / 1.635)
FREQ_RESOLUTION = rate_train / NFFT
overlap = int(NFFT/2) 

# Perform FFTs, window of interest: principal piano freqs: A0 to C8 (27.5 to 4186.0 Hz)
spectral_train, fft_imin_train, fft_imax_train, sec_per_step_train = FACTORY.\
    perform_fft(data_train, rate_train, NFFT, overlap, freq_window=(27.5, 4186.0))

spectral_val, fft_imin_val, fft_imax_val, sec_per_step_val = FACTORY.\
    perform_fft(data_val, rate_val, NFFT, overlap, freq_window=(27.5, 4186.0))

# Network inputs
m = len(spectral_train)                    # Number of time_batch windowed samples
n_freq = fft_imax_train - fft_imin_train   # Number of frequencies per sample (over window)
time_batch = 1                             # ~3 second time window (10 == 307.46ms)

# Reshape FFT data for DenseNet
X_train = FACTORY.format_spectral_data(spectral_train, time_batch, (fft_imin_train, fft_imax_train))
X_val = FACTORY.format_spectral_data(spectral_val, time_batch, (fft_imin_val, fft_imax_val))
print(f"Input data shape, train: {X_train.shape}, validate: {X_val.shape}")

# Network ouput will be a time_batch set of binary outs:
# equal True if note struck on that step, False otherwise

# Generate matching label dataset
midi_train = MidiFile('../data/midi_cleaned/88_Key_Ascending_Chromatic_Scale.mid')
midi_val = MidiFile('../data/midi_cleaned/88_Key_Descending_Chromatic_Scale.mid')

Y_train = generate_contiguous_labels(midi_train, m, sec_per_step_train)
Y_val = generate_contiguous_labels(midi_val, m, sec_per_step_val)
print(f"Output data shape, train: {Y_train.shape}, validate: {Y_val.shape}")

In [None]:
def find_power_spikes(signal_list):
#{
    spikes = []; prev_slope = 1
    for p in range(0, len(signal_list)-1):
    #{            
        slope = signal_list[p+1] - signal_list[p]
        if (slope < 0) and (prev_slope > 0): spikes.append(1)
        else: spikes.append(0)
        prev_slope = slope
    #}
    
    spikes.append(0)
    return np.asarray(spikes)
#}

# This scales the data to between [0,1]
def format_spike_mean_data(spectral_list, time_batch, freq_window):
#{
    m = len(spectral_list)
    fft_imin = freq_window[0]
    fft_imax = freq_window[1]
    X = np.zeros((m, 2, time_batch))
    
    mean_power = [np.mean(spectrum[fft_imin:fft_imax]) for spectrum in spectral_list]
    power_output = [np.sum(spectrum[fft_imin:fft_imax]) for spectrum in spectral_list]
    
    # Concatenate power spikes with average power output
    mean_power = np.reshape(mean_power / np.std(mean_power), (-1,1))
    power_spikes = np.reshape(find_power_spikes(power_output), (-1,1))    
    spike_mean = np.concatenate((power_spikes, mean_power), axis=1)

    # Zero pad a time_batch at the end
    zpad = np.zeros((time_batch, spike_mean.shape[1]))
    spike_mean = np.concatenate((spike_mean, zpad), axis=0)
    
    for i in range(m): X[i,:,:] = spike_mean[i:i+time_batch, :].transpose()
    
    return X
#}

X_mean_train = format_spike_mean_data(spectral_train, time_batch, (fft_imin_train, fft_imax_train))
X_mean_val = format_spike_mean_data(spectral_val, time_batch, (fft_imin_val, fft_imax_val))
print(f"Input data shape, train: {X_mean_train.shape}, validate: {X_mean_val.shape}")
print(f"Output data shape, train: {Y_train.shape}, validate: {Y_val.shape}")

In [None]:
def generate_power_spike_labels(spectral_list, freq_window):
#{
    m = len(spectral_list)
    fft_imin = freq_window[0]
    fft_imax = freq_window[1]
    Y = np.zeros((m, 1))
    
    power_output = [np.sum(spectrum[fft_imin:fft_imax]) for spectrum in spectral_list]
    Y[:,0] = find_power_spikes(power_output)
    
    return Y
#}

Y_train = generate_power_spike_labels(spectral_train, (fft_imin_train, fft_imax_train))
Y_val = generate_power_spike_labels(spectral_val, (fft_imin_val, fft_imax_val))
print(f"Output data shape, train: {Y_train.shape}, validate: {Y_val.shape}")

In [None]:
import keras
from keras.backend import squeeze
from keras.layers import Input, Conv1D, Conv2D, Activation, MaxPooling2D, GRU, Dropout
from keras.layers import TimeDistributed, Flatten, Dense, BatchNormalization, Bidirectional

In [None]:
def create_dense_network(n_freq):
#{
    # Layer #0: Input
    X_input = Input(shape=(n_freq, 1))
    
    # Layer #1: Conv1D to reduce huge input dim
    X = Conv1D(128, kernel_size=16, strides=4)(X_input)
    X = BatchNormalization()(X)
    X = Activation('relu')(X)
    #X = Dropout(0.8)(X)
    
    # Layer #2: Dense layer
    X = Dense(64, activation='relu')(X)
    #X = Dropout(0.8)(X)

    # Layer #3: Dense layer
    X = Dense(1, activation='relu')(X)
    #X = Dropout(0.8)(X)

    # Layer #4: Collapse to 1D, then Binary sigmoid
    X = Flatten()(X)
    X_output = Dense(1, activation = "sigmoid")(X) 
    
    model = keras.models.Model(inputs = X_input, outputs = X_output)
    model.summary()

    return model
#}

In [None]:
# Generate model and configure for training
model = create_dense_network(n_freq)
opt = keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, decay=0.01)
early_stop = [keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)]
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['binary_accuracy'])

In [None]:
# Train model
history = model.fit(X_train, Y_train, batch_size=m, validation_data=(X_val, Y_val), callbacks=early_stop, epochs=100)

In [None]:
# List all data in history
print(history.history.keys())

# Plot with respect to accuracy
plt.figure(1)
plt.plot(history.history['binary_accuracy'])
plt.plot(history.history['val_binary_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['train', 'validate'], loc='upper left')

# Plot with respect to loss
plt.figure(2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train', 'validate'], loc='upper left')

In [None]:
# Evaluate model
output = model.predict(X_train)
output = np.round(output)

if output.shape != Y_train.shape:
    output = np.reshape(output, Y_train.shape)

# print(np.concatenate((output, Y_train, np.equal(Y_train, output)), axis=1))

total_pred_correct = np.sum(np.equal(Y_train, output))
total_pred_positive = np.sum(np.logical_and(output, 1))
true_positive = np.sum(np.logical_and(Y_train, output))
false_negative = np.sum(np.logical_and(np.logical_xor(Y_train, output), Y_train))

print("Model prediction accuracy:", total_pred_correct/m )
print("Model prediction precision:", true_positive/total_pred_positive )
print("Model prediction recall:", true_positive/(true_positive + false_negative), "\n")

In [None]:
def create_dense_network_tbatch(n_freq, time_batch):
#{
    # Layer #0: Input
    X_input = Input(shape=(n_freq, time_batch))
    
    # Layer #1: Conv1D to reduce huge input dim
    X = Conv1D(128, kernel_size=min(n_freq, 16), strides=4)(X_input)
    X = BatchNormalization()(X)
    X = Activation('relu')(X)
    X = Dropout(0.2)(X)
    
    # Layer #2: Dense layer
    X = Dense(64, activation='relu')(X)
    X = Dropout(0.2)(X)

    # Layer #3: Dense layer
    X = Dense(1, activation='relu')(X)
    X = Dropout(0.2)(X)

    # Layer #4: Collapse to 1D, then Binary sigmoid
    X = Flatten()(X)
    X_output = Dense(1, activation = "sigmoid")(X) 
    
    model = keras.models.Model(inputs = X_input, outputs = X_output)
    model.summary()

    return model
#}

In [None]:
# Generate model and configure for training
model = create_dense_network_tbatch(2, time_batch)
opt = keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, decay=0.01)
early_stop = [keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)]
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['binary_accuracy'])

In [None]:
history = model.fit(X_mean_train, Y_train, batch_size=m, validation_data=(X_mean_val, Y_val), callbacks=early_stop, epochs=100)

In [None]:
# List all data in history
print(history.history.keys())

# Plot with respect to accuracy
plt.figure(1)
plt.plot(history.history['binary_accuracy'])
plt.plot(history.history['val_binary_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['train', 'validate'], loc='upper left')

# Plot with respect to loss
plt.figure(2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train', 'validate'], loc='upper left')

In [None]:
# Reshape FFT data for time batched SequenceNet
time_batch = 10  # ~3 second time window (10 == 307.46ms)
X_train = FACTORY.format_spectral_data(spectral_train, time_batch, (fft_imin_train, fft_imax_train))
X_val = FACTORY.format_spectral_data(spectral_val, time_batch, (fft_imin_val, fft_imax_val))
print(f"Input data shape, train: {X_train.shape}, validate: {X_val.shape}")

# Y_train = generate_contiguous_labels(midi_train, m, sec_per_step_train)
# Y_val = generate_contiguous_labels(midi_val, m, sec_per_step_val)
print(f"Output data shape, train: {Y_train.shape}, validate: {Y_val.shape}")

In [None]:
def create_sequential_network(n_freq, time_batch):
#{
    # Layer #0: Input
    X_input = Input(shape=(n_freq, time_batch))
    
    # Layer #1: First GRU layer
    X = Bidirectional(GRU(10, return_sequences = True))(X_input)
    X = BatchNormalization()(X)
    #X = Dropout(0.8)(X)

    # Layer #2: Second GRU layer
    X = Bidirectional(GRU(10, return_sequences = False))(X)
    X = BatchNormalization()(X)
    #X = Dropout(0.8)(X)
    
    # Layer #3: Collapse to 1D, then Binary sigmoid
    #X = Flatten()(X)
    X_output = Dense(1, activation = "sigmoid")(X) 
    
    # Layer #4: Time-distributed dense layer, binary out
    #X_output = TimeDistributed(Dense(1, activation = "sigmoid"))(X)
    
    model = keras.models.Model(inputs = X_input, outputs = X_output)
    model.summary()

    return model
#}

In [None]:
# Generate model and configure for training
model = create_sequential_network(n_freq, time_batch)
opt = keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, decay=0.01)
early_stop = [keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)]
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['binary_accuracy'])

In [None]:
history = model.fit(X_train, Y_train, batch_size=5, validation_data=(X_val, Y_val), callbacks=early_stop, epochs=10)

In [None]:
# List all data in history
print(history.history.keys())

# Plot with respect to accuracy
plt.figure(1)
plt.plot(history.history['binary_accuracy'])
plt.plot(history.history['val_binary_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['train', 'validate'], loc='upper left')

# Plot with respect to loss
plt.figure(2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train', 'validate'], loc='upper left')

In [None]:
FLIP_TOLERANCE = 0.5

def output_midi_file(nn_output, filename, sec_per_step):
#{
    midi_orig = MidiFile(re.sub(r'midi_output', 'midi_cleaned', filename))
    
    tempo = mido.bpm2tempo(DEFAULT_BPM)
    ppqn = midi_orig.ticks_per_beat
    
    midi_file_out = mido.MidiFile(ticks_per_beat=ppqn)
    track_compare = midi_orig.tracks[0]
    track_out = mido.MidiTrack()
    
    accrued_ticks = 0
    last_strike_tick = 0
    for time_step in range(nn_output.shape[0]):
    #{
        tick = mido.second2tick(time_step * sec_per_step, ppqn, tempo)
        if (nn_output[time_step] == 1) and (last_strike_tick < tick):
        #{
            track_out.append(mido.Message('note_on', note=60, velocity=64, time=int(tick-last_strike_tick)))
            track_out.append(mido.Message('note_on', note=60, velocity=0, time=200))
            last_strike_tick = int(tick) + 200
        #}
    #}
        
    track_out.append(mido.MetaMessage('end_of_track'))
    midi_file_out.tracks.append(track_compare)
    midi_file_out.tracks.append(track_out)
    midi_file_out.save(filename)
    
    accrued_ticks = 0
    for message in midi_file_out.tracks[1]: accrued_ticks += message.time
    seconds = mido.tick2second(accrued_ticks, ppqn, tempo)
    
    print("Saved to MIDI file:", filename)
    print("  Number of ticks per beat (PPQN):", ppqn)
    print("  Number of messages:", len(midi_file_out.tracks[0]))
    print("  Number of seconds of messages:", seconds, "\n")
#}

In [None]:
def evaluate_model(model, X, Y):
#{
    # Evaluate model
    output = model.predict(X)
    output = np.round(output)
    
    if output.shape != Y.shape:
        output = np.reshape(output, Y.shape)

    total_pred_correct = np.sum(np.equal(Y, output))
    total_pred_positive = np.sum(np.logical_and(output, 1))
    true_positive = np.sum(np.logical_and(Y, output))
    false_negative = np.sum(np.logical_and(np.logical_xor(Y, output), Y))

    print("Model prediction accuracy:", total_pred_correct/m )
    print("Model prediction precision:", true_positive/total_pred_positive )
    print("Model prediction recall:", true_positive/(true_positive + false_negative), "\n")
    
    return(output)
#}

print("Training set:")
output_train = evaluate_model(model, X_train, Y_train)

print("Validation set:")
output_val = evaluate_model(model, X_val, Y_val)

# Generate an actual MIDI file from the model to compare visually with original
#output_midi_file(output_train, 'data/midi_output/88_Key_Ascending_Chromatic_Scale.mid', sec_per_step_train)
#output_midi_file(output_val, 'data/midi_output/88_Key_Descending_Chromatic_Scale.mid', sec_per_step_train)

In [None]:
# Compare to non-ML estimation using just power_output maxima to estimate note_strikes
power_output = [np.sum(spectrum[fft_imin_train:fft_imax_train]) for spectrum in spectral_train]
power_spikes = np.reshape(find_power_spikes(power_output), (-1,1))    

total_pred_correct = np.sum(np.equal(Y_train, power_spikes))
total_pred_positive = np.sum(np.logical_and(power_spikes, 1))
true_positive = np.sum(np.logical_and(Y_train, power_spikes))
false_negative = np.sum(np.logical_and(np.logical_xor(Y_train, power_spikes), Y_train))

print("Naive power-max model prediction accuracy:", total_pred_correct/m )
print("Naive power-max model prediction precision:", true_positive/total_pred_positive )
print("Naive power-max model prediction recall:", true_positive/(true_positive + false_negative), "\n")

# Generate the MIDI file from the model to compare visually with original
output_midi_file(power_spikes, '../data/midi_output/88_Key_Ascending_Chromatic_Scale.mid', sec_per_step_train)