___
# mu7RON Demo 2
---

In this tutorial we will prepare some midi sequences as input and train an LSTM or GRU network

In [1]:
%pylab inline
import os
import copy
import hashlib
from zipfile import ZipFile

import numpy as np
import midi
from tqdm import tqdm

from mu7ron import analyze, aug, edit, generate, coders, learn, maps, utils, visualize, temporal

Populating the interactive namespace from numpy and matplotlib
pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


The directories that contain my midi files:

In [2]:
data_dir = os.path.join(os.getcwd(), r'..\..\..\..\__Data__\Midi\Bach')
sub_dirs = [
            "Dave's",
           ]

In [3]:
dirs = [os.path.join(data_dir, dir_) for dir_ in sub_dirs]

#retrieve all filenames ending '.mid'
fnames = []
for dir_ in tqdm(dirs):
    for root, subs, files in os.walk(dir_):
        for fn in files:
            if fn.lower().endswith(('.mid', '.midi')):
                fnames.append(os.path.join(root, fn))

100%|███████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 111.12it/s]


Load up the midi files with python-midi. Some of my files have errors so I use a try/except block to catch exceptions.

In [4]:
data   = []

for fn in tqdm(fnames):
    try:
        data.append(midi.read_midifile(fn))
    except (TypeError, AssertionError):
        pass

100%|████████████████████████████████████████████████████████████████████████████████| 261/261 [00:13<00:00, 18.85it/s]


We need to decide on some variables which we will need to keep constant through training and when we make predictions / generate music.

In [5]:
off_mode     = True   # whether to encode NoteOffEvents or just NoteOnEvents with 0 velocity
q            = 18     # quantization factor of velocity
q_map        = maps.create_q_map(128, q, encode=True, decode=True)
t            = 8     # the smallest timestep in milliseconds
n_vel        = utils.dynamic_order(128, q) # No. of different available levels of velocity
n_time       = 48   # Available timesteps
n_pitch      = 128 * 2 if off_mode else 128 # + Available pitches
n_pulse      = 0     # Number of added pulses
n_vocab      = n_time + n_pitch + n_vel + n_pulse # Available choices
n_sample     = 240    # batch size
n_input      = 120   # time steps - the temporal dimension
n_output     = 1     # number of timesteps to predict into the future
n_teach      = 7
n_step       = n_input + n_output + n_sample - 2
n_example    = 1
buffer       = 150
random_state = 117

time_encoder = temporal.base_digits_encoder #time.timeslips_encoder
time_decoder = temporal.base_digits_decoder #time.timeslips_decoder
ekwa         = dict(b=n_time) #dict(t=t, n_time=n_time)
dkwa         = dict(b=n_time)

In [6]:
# we will only include midi files that contain ONLY these instruments or NO
# defined instruments (which defaults to acoustic piano)

insts_to_keep = {
    *range(40),
#     45,
#     *range(56, 105),
#     *range(113, 128),
}

data = [x for x in data if analyze.has_only_inst(x, insts_to_keep)]

len(data)

70

In [7]:
data = [x for x in data if 1 < analyze.max_simulataneous_notes(x) < 4]
len(data)

24

In [8]:
typs_2_keep = (
             midi.NoteOffEvent,
             midi.NoteOnEvent,
             midi.SetTempoEvent,
             midi.TimeSignatureEvent,
)

def copy_func(old_evnt, new_typ):
    new_evnt = new_typ(
                       tick=old_evnt.tick,
                       data=[old_evnt.data[0], 0],
                       channel=old_evnt.channel,
                       )
    return new_evnt


for i in tqdm(range(len(data))):
    
    # remove redundant events types
    data[i] = edit.filter_ptrn_of_evnt_typs(data[i], typs_2_keep)
    
    # filter out NotOnEvents on channel 9 (10); which is used exclusively for percussion
    data[i] = edit.filter_ptrn_of_percussion(data[i])
    
    # combine all Track in the Pattern into a single Track
    data[i] = edit.consolidate_trcks(data[i])
    
    # adjust resolution to 480 but also adjust all tempo evnts as to preserve the sounding
    # speed of the music
    data[i] = edit.normalize_resolution(data[i], res=480)
    # finally save
    
    # we will quantize velocity into a fewer possible values
    data[i] = edit.quantize_typ_attr(data[i], q, (midi.NoteOnEvent, midi.NoteOffEvent), lambda x: x.data[1])
    
    # consolidating trcks leads to a lot of redundent/duplicate evnts which we can remove
    data[i] = edit.dedupe(data[i])
    
    if not off_mode:
        data[i] = edit.replace_evnt_typ(data[i], midi.NoteOffEvent, midi.NoteOnEvent, copy_func=copy_func)
    
    # split each ptrn where a timesignature event occurs
    data[i] = edit.split_on_timesignature_change(data[i], midi.TimeSignatureEvent)
    
data = list(utils.flatten(data, depth=1))

100%|██████████████████████████████████████████████████████████████████████████████████| 24/24 [00:06<00:00,  3.96it/s]


In [9]:
data = edit.filter_data_of_empty_ptrn(data)

In [10]:
# We need to remove timesignature events

typs_2_keep=(
             midi.NoteOffEvent,
             midi.NoteOnEvent,
             midi.SetTempoEvent,
)

for i in tqdm(range(len(data))):
    # remove redundant events types
    data[i] = edit.filter_ptrn_of_evnt_typs(data[i], typs_2_keep)


100%|██████████████████████████████████████████████████████████████████████████████████| 37/37 [00:01<00:00, 24.34it/s]


In [11]:
data = [ptrn[0] for ptrn in data]

In [12]:
input_ = [coders.categorize_input(x, 
                                 q=q,
                                 q_map=q_map,
                                 time_encoder=time_encoder,
                                 ekwa=ekwa,
                                 n_time=n_time,
                                 n_vel=n_vel,
                                 sort_velocity=True,
                                 sort_pitch=True,
                                 sort_pulse=False,
                                 asarray=True, 
                                 dtype='int',
                                 off_mode=off_mode,
                                 ) for x in tqdm(data)]


100%|█████████████████████████████████████████████████████████████████████████████████| 37/37 [00:00<00:00, 107.25it/s]


In [13]:
np.random.seed(random_state)

idx = np.random.permutation(range(len(input_)))
input_ = np.array(input_)[idx]
test_frac = 0.2
n         = len(input_)
idx       = int(round(n * (1. - test_frac)))
train, valid = input_[: idx],  input_[idx: ]

In [14]:
train_gen = aug.MappedDataAugGen(train,
                     teacher_forcing=True,
                     n_teach=n_teach, #n_teach,
                     n_step=n_step,
                     buffer=buffer,
                     time_encoder=time_encoder,
                     time_decoder=time_decoder,
                     ekwa=ekwa,
                     dkwa=dkwa,
                     n_time=n_time,
                     n_vocab=n_vocab,
                     n_sample=n_sample, 
                     n_input=n_input, 
                     n_output=n_output,
                     off_mode=off_mode,
                     lo_lim=60,
                     hi_lim=90,
                     time_aug_range=[0.95, 1., 1.05],
                     )
valid_gen = aug.MappedBalancedDataGen(valid, n_input=n_input, n_example=n_example, n_output=n_output, n_sample=n_sample, n_vocab=n_vocab)

8 samples were removed because they are too short. There are 22 remaining. 
If this is not enough, try reducing n_step, n_teach and/or buffer and create a new instance of MappedDataAugGen


In [15]:
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, Dropout, GRU, Activation
from tensorflow.compat.v1 import ConfigProto, Session
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, Callback
from tensorflow.keras import optimizers
from tensorflow.keras.metrics import CategoricalAccuracy, CategoricalCrossentropy
from tensorflow.keras import backend as K

In [16]:
config = ConfigProto()
config.gpu_options.allow_growth = True
session = Session(config=config)
import operator

In [17]:
opt = optimizers.Nadam(
                      learning_rate=0.001,
                      beta_1=0.9,
                      beta_2=0.999,
                      epsilon=1e-07,
                      name="Nadam",
                      clipnorm=True,
                      )


class CustomCheckpoint(Callback):

    def __init__(self, **kwargs):
        super().__init__()
        
        self.save_dir  = kwargs.get('save_dir', os.path.join(os.getcwd(), 'models'))
        self.monitor   = kwargs.get('monitor', 'val_loss')
        self.verbose   = kwargs.get('verbose', True)
        self.save_mode = kwargs.get('save_mode', 'save') # save_weights
        
        if self.monitor.endswith('loss'):
            self.best  = np.inf
            self.oper  = operator.lt
        else:
            self.best  = -np.inf
            self.oper  = operator.gt
    
    def on_epoch_end(self, epoch, logs=None):
        imp = False
        if self.oper(logs[self.monitor], self.best):
            imp = True
            val = str(logs[self.monitor]).replace('.', '_')
            save_path = os.path.join(self.save_dir, f"{utils.tstamp(f'model_{val}')}.h5")

        if self.verbose:
            if imp:
                msg = f"{self.monitor} improved from {self.best} to {logs[self.monitor]}, saving model to {save_path}" 
            else:
                msg = f"val_loss did not improve from {self.best}"
            print(f"epoch: {epoch} {msg} loss: {round(logs['loss'], 4)} acc: {round(logs['categorical_accuracy'], 4)} val_acc: {round(logs['val_categorical_accuracy'], 4)} val_loss: {round(logs['val_loss'], 4)}")
            
        if imp:
            getattr(self.model, self.save_mode)(save_path, overwrite=True)
            self.best = logs[self.monitor]


autosaver = CustomCheckpoint()

log_dir = os.path.join(os.getcwd(), 'logs', utils.tstamp('mugen_lstm'))
tensorboard = TensorBoard(log_dir=log_dir, profile_batch=0)

In [18]:
model = Sequential()
model.add(GRU(240, 
               input_shape=(n_input, n_vocab),
               return_sequences=True,
               ))

model.add(Dropout(0.3))

model.add(GRU(240))

model.add(Dropout(0.3))

model.add(Dense(n_vocab))
model.add(Activation('softmax'))
model.compile(
              loss='categorical_crossentropy', 
              optimizer=opt,
              metrics=['categorical_accuracy'],
)
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
gru (GRU)                    (None, 120, 240)          398880    
_________________________________________________________________
dropout (Dropout)            (None, 120, 240)          0         
_________________________________________________________________
gru_1 (GRU)                  (None, 240)               347040    
_________________________________________________________________
dropout_1 (Dropout)          (None, 240)               0         
_________________________________________________________________
dense (Dense)                (None, 312)               75192     
_________________________________________________________________
activation (Activation)      (None, 312)               0         
Total params: 821,112
Trainable params: 821,112
Non-trainable params: 0
__________________________________________________

In [None]:
model.fit(train_gen, epochs=1000, verbose=True, callbacks=[tensorboard, autosaver], validation_data=valid_gen, workers=-1)

Train for 2835 steps, validate for 2 steps
Epoch 1/1000
Epoch 2/1000
 659/2835 [=====>........................] - ETA: 3:51 - loss: 1.8922 - categorical_accuracy: 0.5076