# 1 - Globals
Let's first define some configuration variables that will be used throughout.  Always run this as different sections may use it.

In [1]:
ROLLS_FILE = './ragtime-generator.bin'
MODEL_FILE = './ragtime-generator.hdf5'
MIDI_PATH = './data/'

MAX_DURATION = 12

NOTE_SEP = '!'
REST_KEY = '@'

SEQUENCE_LENGTH = 8

# 2 - Conversion
Let's now preprocess all of the midi files into a representative format.

In [2]:
import music21 as m21

def convert_midi_to_roll(file):
    midi = m21.converter.parse(file)
    
    note_filter = m21.stream.filters.ClassFilter('Note')
    chord_filter = m21.stream.filters.ClassFilter('Chord')
    rest_filter = m21.stream.filters.ClassFilter('Rest')

    # Build a list of notes, chords, and rests and sort them by offset (time)
    notes = []
    notes.extend(list(midi.recurse().addFilter(note_filter)))
    notes.extend(list(midi.recurse().addFilter(chord_filter)))
    notes.extend(list(midi.recurse().addFilter(rest_filter)))
    notes = sorted(notes, key=lambda x: x.offset)
    
    # Process the notes into a roll.
    num_skipped = 0 # How many notes were skipped (for any reason)?
    prev_offset = 0.0 # Previous element's offset. Used for dumping.
    notes_to_dump = [] # Accumulated notes in a single offset.
    durations_to_dump = [] # Durations matching notes_to_dump.
    roll = []
    for idx, el in enumerate(notes):
        # Skip zero-length elements (midi bug).
        if 'zero' == el.duration.type:
            #print(f'Skipping zero duration: {el}')
            num_skipped += 1
            continue
        
        # Skip lengthy durations.
        if el.duration.quarterLength > MAX_DURATION:
            #print(f'Skipping long duration {el.duration.quarterLength}: {el}')
            num_skipped += 1
            continue
        
        # Dump notes when the next note's offset is different.
        if el.offset != prev_offset:
            if len(notes_to_dump): # Must have this to allow updating of the first element without dumping nothing.
                roll.append(NOTE_SEP.join(str(n.pitch if isinstance(n, m21.note.Note) else n) + '$' + str(d) for n, d in zip(notes_to_dump, durations_to_dump)))
                notes_to_dump = []
                durations_to_dump = []
            prev_offset = el.offset
        
        # Append notes.
        if isinstance(el, m21.note.Note):
            notes_to_dump.append(el)
            durations_to_dump.append(el.duration.quarterLength)
        
        # Append notes from chords.
        if isinstance(el, m21.chord.Chord):
            notes_to_dump.extend(el.notes)
            durations_to_dump.extend([el.duration.quarterLength for n in el.notes])
        
        # Append rests (instantly).
        if isinstance(el, m21.note.Rest):
            roll.append(f'{REST_KEY}${el.duration.quarterLength}')
        
    # Dump remaining notes.
    if len(notes_to_dump):
        roll.append(NOTE_SEP.join(str(n.pitch if isinstance(n, m21.note.Note) else n) + '$' + str(d) for n, d in zip(notes_to_dump, durations_to_dump)))
        notes_to_dump = []
        durations_to_dump = []
    
    return roll
#end

In [3]:
import os
import glob
import pickle

# If the notes file already exists, just load it.
if os.path.exists(ROLLS_FILE):
    with open(ROLLS_FILE, 'rb') as file:
        rolls = pickle.load(file)
    print('Loaded existing rolls pickle.')
else:
    # Convert all midi files, saving their rolls into a list.
    rolls = []
    midi_files = ['./data/C_mapleaf.mid', './data/C_original.mid']#glob.glob(MIDI_PATH + '*.mid')
    for midi_file in midi_files:
        filename = os.path.basename(midi_file)
        print(f'Processing `{filename}`...', end='')
        roll = convert_midi_to_roll(midi_file)
        rolls.append(roll)
        print('done!')
    
    # Write out the rolls to a pickle.
    with open(ROLLS_FILE, 'wb') as file:
        pickle.dump(rolls, file)
    print('Wrote rolls pickle to file.')

Loaded existing rolls pickle.


# 3 - Training
Now let's take all of the processed rolls and create the `X` and `y` data for training.

## Mappings
Generate some mappings to go between unique notes and integers.  The integers are used in the neural network.

In [4]:
# All rolls flattened.
flat_rolls = [item for sublist in rolls for item in sublist]

# All unique notes across all flattened rolls.
unique_notes = sorted(set(flat_rolls))

# Build two dictionaries.  One maps notes (as strings) to ints, and the other backwards.
# We use the first to convert the rolls into a sequence of integers, and the second to convert back to notes.
note_to_int = dict((note, num) for num, note in enumerate(unique_notes))
int_to_note = dict((num, note) for num, note in enumerate(unique_notes))

## `X` and `y` data
Building the training data using a sliding window.  Since the rolls are a nested list - one for each piece - I'm going to ensure that the sliding window does not go over a boundary (hence the nested lists).  Essentially, I'm creating `X` and `y` from sliding windows over different pieces joined together rather than treating the entire thing as one giant sequence.

In [5]:
import numpy as np
from keras.utils import np_utils

data_X = []
data_y = []

# Apply a sliding window per piece but append the same data array.
# This avoids a sliding window overlapping the boundaries between pieces.
for roll in rolls:
    for i in range(0, len(roll) - SEQUENCE_LENGTH):
        # Snip a sequence of our piece as the X data of this window.
        seq_in = roll[i:i + SEQUENCE_LENGTH]
        data_X.append([note_to_int[n] for n in seq_in])
        
        # Take the next note as the y value to predict.
        seq_out = roll[i + SEQUENCE_LENGTH]
        data_y.append(note_to_int[seq_out])
    #end
#end

# Create and shape the final X and y data for the network.
X = np.reshape(data_X, (len(data_X), SEQUENCE_LENGTH, 1))
X = X / float(len(flat_rolls))
y = np_utils.to_categorical(data_y)

Using TensorFlow backend.


## Train/Test Split
Of course we now need to split our data into a training and testing set to better avoid overfitting.

In [6]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

print(f'{len(X_train)} train samples and {len(X_test)} test samples.')

21226 train samples and 9097 test samples.


## Neural network
And now build the model.

In [7]:
from keras.models import Sequential
from keras.layers import LSTM, Dropout, Dense
import keras.backend as K

# A fresh start when debugging
K.clear_session()

# Build the model.
model = Sequential()
model.add(LSTM(128, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(128))
model.add(Dropout(0.2))
model.add(Dense(y_train.shape[1], activation='softmax'))
#
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

model.summary()

# TODO: Try a custom instance of ADAM with a lowered learning rate.
# TODO: Investigate a custom loss function that takes into account what is a 'good' note.

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm_1 (LSTM)                (None, 8, 128)            66560     
_________________________________________________________________
dropout_1 (Dropout)          (None, 8, 128)            0         
_________________________________________________________________
lstm_2 (LSTM)                (None, 128)               131584    
_________________________________________________________________
dropout_2 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 5827)              751683    
Total params: 949,827
Trainable params: 949,827
Non-trainable params: 0
_________________________________________________________________


Load weights and/or train the model.

In [8]:
from keras.callbacks import ModelCheckpoint

checkpoint = ModelCheckpoint(
    MODEL_FILE,
    monitor='loss',
    verbose=0,
    save_best_only=True,
    mode='min'
)

# Load model weights if they already exist.
if os.path.exists(MODEL_FILE):
    model.load_weights(MODEL_FILE)
    print('Loaded existing weights.')

# Should training take place?
should_train = False
# How many epochs for?
train_epochs = 50 # 100 total (50 before)
# How many per batch?
batch_size = 8*2

if should_train:
    history = model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=train_epochs, batch_size=batch_size, callbacks=[checkpoint])

Loaded existing weights.


# 4 - Prediction
Now it's time to make some music!  Don't forget to load the model first.

In [12]:
import music21 as m21

def split_note_duration(pattern):
    n, d = pattern.split('$')
    if '/' in d:
        a, b = d.split('/')
        d = float(a) / float(b)
    else:
        d = float(d)
    return n, d
#end

def convert_roll_to_midi(notes_array, filename):
    offset = 0.0
    output_notes = []
    
    for pattern in notes_array:
        # handle chords (i.e. multiple notes split by NOTE_SEPARATOR)
        if NOTE_SEP in pattern:
            chord_notes = []
            for chord_note in pattern.split(NOTE_SEP):
                note_name, note_duration = split_note_duration(chord_note)
                new_note = m21.note.Note(note_name)
                new_note.offset = offset
                new_note.storedInstrument = m21.instrument.Piano
                new_note.duration = m21.duration.Duration(note_duration)
                output_notes.append(new_note)
#                 chord_notes.append(new_note)
            #end
            new_chord = m21.chord.Chord(chord_notes)
            new_chord.offset = offset
#             output_notes.append(new_chord)
        #end
        else:
            note_name, note_duration = split_note_duration(pattern)
            # handle rests
            if REST_KEY == note_name:
                new_rest = m21.note.Rest()
                new_rest.offset = offset
                new_rest.duration = m21.duration.Duration(note_duration)
                output_notes.append(new_rest)
            else:
                new_note = m21.note.Note(note_name)
                new_note.offset = offset
                new_note.duration = m21.duration.Duration(note_duration)
                output_notes.append(new_note)
        #end
        
        offset += 0.25 # TODO: Solve this to not be fixed like this.
    #end
    
    midi_stream = m21.stream.Stream(output_notes)
    midi_stream.timeSignature = m21.meter.TimeSignature('2/4')
    midi_stream.keySignature = m21.key.KeySignature(0)
    midi_stream.write('midi', fp=filename)
    return output_notes
#end

In [15]:
import numpy as np

# How many of the best predictions to randomly select from instead of the best only.
# Setting this to 1 takes the best prediction each time but is more prone to loops.
num_top_preds = 3

# If true, feeds the best choice back to the pattern even if another choice was chosen.
feed_best_choice = False

# Get a random starting point.
start_idx = np.random.randint(0, len(data_X))
pattern = data_X[start_idx]

# Generate!
output = []
for idx in range(20 * SEQUENCE_LENGTH):
    # Shape the input and make a prediction.
    pred_input = np.reshape(pattern, (1, len(pattern), 1))
    pred_input = pred_input / float(len(flat_rolls))
    predictions = model.predict(pred_input)
    
    # Sample one of the best top choices.
    top_predictions = np.argpartition(predictions[0], -num_top_preds)[-num_top_preds:]
    predicted_index = top_predictions[np.random.randint(0, num_top_preds)]
    
    # Convert and save the best choice.
    output.append(int_to_note[predicted_index])
    
    # Feed data back into the prediction pattern.
    pattern.append(np.argmax(predictions[0]) if feed_best_choice else predicted_index)
    pattern = pattern[1:len(pattern)]

In [16]:
# Write just the output.
_ = convert_roll_to_midi(output, './output.mid')