# Overview of Notebook Functions
 1. read data with music21
 2. map input
 3. sequence input
 4. package network input into 1 pickle 
 5. train model
 6. save best weights into hdf5 

# Notebook Use After Training 
 8. use weigths in hdf5 to generate new notes 
 9. unmap sequences
 10. add to music21 stream  
 11. output midi file

**Note:** After first reading in data from the midi files, the notebook will produce pickled files of sequences and normalizers.

In [2]:
from music21 import converter, interval, instrument, note, chord, common, stream, midi, tempo, pitch
import glob
import concurrent.futures
import pickle 
import numpy as np 
from keras.utils import np_utils 

In [3]:
def isolate(mfile):
    orig_midi = converter.parse(mfile) 
    key = orig_midi.analyze("key")

    if key.tonic == "C": 
        return None  

    iv = interval.Interval(key.tonic, pitch.Pitch("C")) 
    midi = orig_midi.transpose(iv) 
    midi = midi.flat 
    
    parts = instrument.partitionByInstrument(midi)
    if parts: 
        for instr in parts: 
            if instr.partName and "Piano" in instr.partName: # xml uses variations like "Grand Piano" 
                return [mfile, instr] 

    return None


In [4]:
def parse_notes(score):
    score_elems = score.recurse()

    note_list = [] 
    pitches = [] # [[treble, bass], ...] 
    elem_lengths = [] # [[treble, bass], ...] 
    offsets = [] 
    prev_offset = -1 
    for element in score_elems: # score: # score_elems: 

        if element.duration.quarterLength <= 0: 
            continue 

        if isinstance(element, note.Note): 
            if element.offset != prev_offset: # this is a note in a brand new time offset 
                note_list.append(str(element.pitch)) 
                pitches.append([element.pitch]) 
                elem_lengths.append([round(float(element.duration.quarterLength), 8)]) 
                offsets.append([float(element.offset)]) 
            else: # this is still in the old time offset 
                if len(pitches[-1]) != 2: # and the old time offset can hold more notes 
                    pitches[-1].append(element.pitch) 
                    elem_lengths[-1].append(round(float(element.duration.quarterLength), 8)) 
                    offsets[-1].append(float(element.offset)) 
            prev_offset = element.offset 

        elif isinstance(element, chord.Chord): 
            if element.offset != prev_offset: 
                temp = '.'.join(str(n.pitch) for n in element)
                note_list.append(temp.split('.')) 
                pitches.append([[k.pitch for k in element]]) # a chord 
                elem_lengths.append([round(float(element.duration.quarterLength), 8)]) 
                offsets.append([float(element.offset)]) 
            else: 
                if len(pitches[-1]) != 2: 
                    pitches[-1].append([k.pitch for k in element]) 
                    elem_lengths[-1].append(round(float(element.duration.quarterLength), 8)) 
                    offsets[-1].append(float(element.offset)) 
            prev_offset = element.offset 

        elif isinstance(element, note.Rest):
            if element.offset != prev_offset: 
                note_list.append(None) 
                pitches.append([None]) 
                elem_lengths.append([round(float(element.duration.quarterLength), 8)]) 
                offsets.append([float(element.offset)]) 
            else: 
                if len(pitches[-1]) != 2: 
                    pitches[-1].append(None) 
                    elem_lengths[-1].append(round(float(element.duration.quarterLength), 8)) 
                    offsets[-1].append(float(element.offset)) 
            prev_offset = element.offset 


    def fixitup(arr, addNone=True): 
        for idx in range(len(arr)): 
            frame = arr[idx]

            if len(frame) < 2: 
                if addNone: 
                    frame.append(None) 
                else: 
                    length_top = frame[0] 
                    if idx > 1: 
                        arr[idx-1][1] += length_top # extend length of previous 
                        frame.append(0) # frame[0] 
                    else: 
                        frame.append(frame[0]) 
     
    fixitup(pitches, addNone=True) 
    fixitup(elem_lengths, addNone=False) 
    fixitup(offsets) 
     
    return (note_list, elem_lengths, pitches, offsets) 

In [5]:
c = pitch.Pitch(63) 
print(c, pitch.Pitch(63).ps == pitch.Pitch("Eb4").ps == pitch.Pitch('D#4').ps) 
 
def numerical(name): 
    try: 
        return pitch.Pitch(name).ps 
    except Exception: 
        print("Bad name", name)
 

E-4 True


In [6]:
def pstr(mpitch): 

    def octv(k): 
        v = k.octave 
        # v = 4 
        r = pitch.Pitch(k.name+str(v)) 
        return r
         
    if not mpitch or (isinstance(mpitch, str) and "r" in mpitch):
        return "r"  
    if isinstance(mpitch, str) and "b" in mpitch: 
        print("b is not allowed!") 
        raise 
        return "r" 
    if isinstance(mpitch, list) or isinstance(mpitch, set): 
        mpitch = [octv(k) for k in mpitch] 
        return "-".join([str(k.ps) for k in mpitch]) 
    else: 
        mpitch = octv(mpitch) 
        return str(mpitch.ps) 

def map_values(notes, lengths, pitches, offsets, counter): 
    enum_count, all_names, name_to_num = counter.enum_count, counter.all_names, counter.name_to_num 
    all_names = [pstr(k[0]) for k in pitches] + [pstr(k[1]) for k in pitches]

    for k in all_names:
      if k not in name_to_num:
          name_to_num[k] = enum_count
          enum_count += 1
    vector_list = []

    for index in range(min(len(pitches), len(lengths))): 
        music_vector = []
        length = max(lengths[index][0], lengths[index][1]) 
        if length <= 8: 
            treble, bass = pstr(pitches[index][0]), pstr(pitches[index][1]) 
            treble_length, bass_length = lengths[index][0], lengths[index][1] 
            music_vector = [name_to_num[treble], treble_length, name_to_num[bass], bass_length] 
            vector_list.append(music_vector)

    counter.enum_count = enum_count 
    counter.all_names = all_names 
    counter.name_to_num = name_to_num 
    return (vector_list, counter) 
     

In [7]:
%rm -rvf stream* 

In [8]:
import sys 
import copy 
import math 
from numpy import argmax 
from keras.utils import normalize, to_categorical 
def qround(value): 
    return math.ceil(value*4)/4 
def sequence_values(counter, normalizers): # mapped_notes, enum_count, name_to_num, 
    mapped_notes, enum_count, name_to_num = counter.mapped_notes, counter.enum_count, counter.name_to_num 
    max_duration = counter.max_duration
    max_offset = counter.max_offset
    seq_size = normalizers.seq_size 
    nw_in, nw_out = [], [] 
    all_pkgs = set()

    for vector_list in mapped_notes: 
        for idx in range(0, len(vector_list)): 
            music_vector = copy.deepcopy(vector_list[idx]) 
            result = (music_vector[0], qround(music_vector[1]), 
                      music_vector[2], qround(music_vector[3])) 
            all_pkgs.add(result) 
            vector_list[idx] = result 
         
    pkg_to_int = dict([(pkg, number) for number, pkg in enumerate(list(all_pkgs))]) 
    mean, std = np.mean(list(pkg_to_int.values())), np.std(list(pkg_to_int.values())) 
    print("packaged notes, there are", len(pkg_to_int), "packages. \n\tpkgs=", list(all_pkgs)[0:5], "...") 
    for vector_list in mapped_notes: 
        for k in range(0, len(vector_list) - seq_size - 1): 
            window = vector_list[k:k+seq_size] # [[pitch, duration], ...] 
            expected = vector_list[k+seq_size] # [pitch, duration] 
            nw_in.append([pkg_to_int[k] for k in window]) 
            nw_out.append(pkg_to_int[expected]) 

    print("Our code is done,", len(nw_in), len(nw_out)) 
    num_pkgs, num_windows, seq_size = len(pkg_to_int), len(nw_in), len(nw_in[0]) 
    normalizers.mpc, normalizers.spc, normalizers.mln, normalizers.sln = 0, 0, 0, 0 
    normalizers.pkg_to_int = pkg_to_int 
    normalizers.mean, normalizers.std = mean, std 
    normalizers.num_pkgs = num_pkgs 

    print(num_windows, "windows, each with", seq_size, "frames, (each with", 4, "features that map to 1 number), ", 
        "total=window*frame*1=", num_windows*seq_size*1) 
    nw_in = np.array(nw_in).astype(np.float32) 
    nw_out = np.array(nw_out).astype(np.float32) 
     
    nw_in = np.reshape(nw_in, (num_windows, seq_size, 1)) 
    nw_out = to_categorical(nw_out) 

    print("Input and output shapes:", nw_in.shape, nw_out.shape) 
    return (nw_in, nw_out, counter, normalizers) 
     

In [None]:
!pip install keras-self-attention 
from keras.models import Sequential 
from keras.layers import Activation, Dense, Bidirectional, LSTM, Dropout, Flatten, TimeDistributed, Reshape 
from keras.callbacks import ModelCheckpoint 
from keras.optimizers import SGD 
from keras.optimizers import Adam 
from keras_self_attention import SeqSelfAttention

def basic_network(counter, normalizers): # * 
    model = Sequential() 
    model.add(LSTM(512, input_shape=(normalizers.seq_size, 1), return_sequences=True)) 
    model.add(LSTM(512, input_shape=(normalizers.seq_size, 1), return_sequences=False)) 
    model.add(Flatten()) 
    model.add(Dense(normalizers.num_pkgs)) 
    model.add(Activation('softmax')) 
    model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy']) # loss mae, mse, categorical_crossentropy, opt rmsprop 
    return model

def load_model(filename, counter, normalizers): 
    model = basic_network(counter, normalizers) 
    model.load_weights(filename) 
    return model

def train(model, nw_in, nw_out, num_epochs=100): 
    fp = "weights.best.hdf5" 
    checkpoint = ModelCheckpoint(fp, monitor='loss', verbose=1, save_best_only=True) 
    model.fit(nw_in, nw_out, epochs=num_epochs, batch_size=256, callbacks=[checkpoint]) 

In [10]:
def unmap_values(complete, counter, normalizers):
    # input: list of music vectors, counter object
    # output: music21 stream object
    # music_stream = stream.Stream()

    print("spc: ", normalizers.spc)
    print("mpc: ", normalizers.mpc)
    print("sln: ", normalizers.sln)
    print("mln: ", normalizers.mln)
    top, bottom = stream.Part(), stream.Part() 
    int_to_pkg = {num:name for name, num in normalizers.pkg_to_int.items()} 
    print("int_to_pkg len ", len(int_to_pkg), ": ", int_to_pkg) 
    num_to_name = {num:name for name, num in counter.name_to_num.items()}
    off, count = 0, 0 
    top.insert(0, instrument.Piano()) 
    bottom.insert(0, instrument.Piano()) 
    def add_to_stream(category, is_treble, i=0, j=1, off=0): 
        if not is_treble: 
            i, j = 2, 3 
        music_vector = int_to_pkg[category] 
        pitch, length = music_vector[i], music_vector[j] 

        pitch = round(pitch)
        if length < 0 or pitch < 0:
            print("ERR ", is_treble, ")", [length, pitch])
            return

        if length > 8: 
            length = 0.5

        if pitch not in num_to_name.keys(): 
            print("ERR", is_treble, ")", category, "=>", music_vector, "=>", [pitch, length], "=>", "?") 
            return 
        elem_name = num_to_name[pitch] # num_to_name[pitch], "-".join(list(num_to_name[pitch]))

        if elem_name == 'b': 
            return # refuse to add anything to stream
             
        if elem_name == 'r':
            elem = note.Rest() 
        elif '-' not in elem_name:
            elem = note.Note(int(float(elem_name)))
        else:
            pitch_arr = elem_name.split('-')
            pitch_arr = [int(float(x)) for x in pitch_arr]
            elem = chord.Chord(pitch_arr)
            
        elem.storedInstrument = instrument.Piano()
        elem.duration.quarterLength = length

        if is_treble: 
            top.append(elem)  
        else: 
            bottom.insert(off, elem) 
        print(is_treble, ")", category, "=>", music_vector, "=>", [pitch, length], "=>", 
              elem, ", length:", elem.duration.quarterLength, ", off:", off, ", offset:", elem.offset) 
        return elem 
     
    for category in complete: 
        tre = add_to_stream(category, True, 0, 1, off) 
        bae = add_to_stream(category, False, 2, 3, tre.offset) 
        off += 0.5 
        count += 1 

    music_stream = stream.Stream([top, bottom]) 
    return music_stream

from datetime import datetime 
def stream_file(stream, title=""): 
    %mkdir -p stream_out 
    %cd stream_out 
    stream.write('midi', fp="stream_"+str(datetime.now())+"_demo"+title+".mid") 
    %cd .. 
     

In [11]:
import copy 
def use_model(model, nw_in, counter): 
    # nw_in is [window, window, ...] 
    start = int(len(nw_in)/2)
    pattern_in = list(nw_in[start]) # python list of ndarrays dtype=float32. do not convert ndarray to python list. len of pattern_in is sequence size. 
    seq_size = len(pattern_in) 
    complete = list(nw_in[start])

    print("starting sequence window", complete[0:10], "...")

    for idx in range(100): 
        print("generating note", idx, ";", end=" ") 
        ptn = np.asarray(pattern_in) 
        prediction_input = np.reshape(ptn, (1, seq_size, -1))
        
        prediction = model.predict(prediction_input, verbose=0)
        num_cat = len(prediction[0]) 
        predicted_next = np.argmax(prediction)

        if idx % 4 == 0: 
            predicted_next = np.random.choice(num_cat, 1, p=prediction[0])[0]
             
        complete.append(np.asarray(predicted_next, dtype='float32').reshape(1)) # numpy array reshape into list. so 419 => nparray(419.0) => nparray([419.0]) 
        pattern_in = complete[idx:idx+seq_size]
    return [int(k) for k in complete] 

In [12]:
def begin_train(nw_in, nw_out, counter, normalizers): 
    model = basic_network(counter, normalizers) 
    model.summary() 
    train(model, nw_in, nw_out, num_epochs=500) 

In [None]:
import os.path 
import math 
import sys 
from datetime import datetime 
 
count = 0 

# if stop_count negative, will read all files
debug, stop_count = False, -1

# file names
nw_pickle, search = "nw.pkl", "./pianoonly-midi/"
normalize_pickle = "normalize.pkl"
!rm -v music21parse.mid unmap_demo.mid 

class Counters:
    def __init__(self):
        self.enum_count = 0 
        self.unique_frames = 0 
        self.all_names = [] 
        self.name_to_num = {} 
        self.mapped_notes = [] 
        self.max_duration = 0
        self.max_offset = 0

class Normalization:
    def __init__(self): 
        self.pkg_to_int = dict() 
        self.num_pkgs = 0 
        self.mean = 0 
        self.std = 0 
        self.seq_size = 64 
        self.mpc = 0 
        self.spc = 0 
        self.mln = 0 
        self.sln = 0 
     
c = Counters()
n = Normalization()
# checks if a pickled sequence file is present
if os.path.isfile(nw_pickle): 
    print("this is just to let you know that the network pickle file is being used, press any key to confirm") 
    consume = input() 
    print("reading from network pickle...") 
    nwfile = open(nw_pickle, "rb") 
    print("(1/2) loading...") 
    nw = pickle.load(nwfile) 
    print("(2/2) parsing...") 
    nw_in, nw_out, c.enum_count, c.max_duration, c.name_to_num = nw[0], nw[1], nw[2], nw[3], nw[4] 
    nwfile.close() 
    print("nw_out has", len(nw_out), "notes, about", math.ceil(len(nw_out)/450), "songs") 
    print("enum_count", nw[2], "max_duration", nw[3]) 
    print("name_to_num has keys", list(nw[4].keys())[:5], "...") 
    print("done, starting to train...")

    print("reading from normalizers pickle...")
    nrmlfile = open(normalize_pickle, "rb")
    print("(1/2) loading...") 
    n = pickle.load(nrmlfile)
    print("(2/2) parsing...") 
    print("there are", len(n.pkg_to_int), "categories") 

    # checks if trained model is present 
    if os.path.isfile("weights.best.hdf5"): 
        print("generating notes...")
        model = load_model("weights.best.hdf5", c, n) 
        result = use_model(model, nw_in, c) 
        print("RESULT", result) 
        result_stream = unmap_values(result, c, n) 
        print("Output result midi file")
        result_stream.write('midi', fp="generated_"+str(datetime.now())+"_demo.mid")
        print("midi file generated...")
        raise 
    else: 
        print("No weights, training model...")
        begin_train(nw_in, nw_out, c, n) 
        raise 
 
print("no pickle yet, reading data from midis...") 
all_files = sorted(glob.glob(search+"*.mid") + glob.glob(search+"*.xml")) # piano_midis 
stop_count = len(all_files) if stop_count < 0 or stop_count > len(all_files) else stop_count 
all_files = all_files[0:stop_count] 
all_pitches = []
 
print("-----", "(", "debug" if debug else "", len(all_files), "files", ")", "-----", end="\n\n") 
 
for mfile in all_files: 
    ret = isolate(mfile) 
    count += 1 
    if ret: 
        print("(", count, "/", stop_count, ") ", ret[0], sep="") 
        piano_part = ret[1] 
        if count == 1: 
            piano_part.write('midi', fp="music21parse.mid") 
        notes, lengths, pitches, offsets = parse_notes(piano_part) 
        all_pitches.extend(pitches) 
        vector_list, new_c = map_values(notes, 
                                  lengths, 
                                  pitches,
                                  offsets,
                                  c) 
        c = new_c 
        c.mapped_notes.append(vector_list) 

max_duration = 0
for vector_list in c.mapped_notes:
    for music_vector in vector_list: 
        if music_vector[1] > max_duration: 
            max_duration = music_vector[1]
c.max_duration = max_duration

if debug: 
    print(c.mapped_notes) 
    print("sequence_values on", c.mapped_notes, c.enum_count, c.name_to_num, max_duration, sep="\n") 
 
nw_in, nw_out, new_c, new_n = sequence_values(c, n) 
c = new_c 
n = new_n

############################################################################### 
# save sequences into pickle 
print("saving into sequences pickle...") 
nwfile = open(nw_pickle, 'wb') 
print("(1/2) network...") 
nw = [nw_in, nw_out, c.enum_count, c.max_duration, c.name_to_num] 
print("(2/2) dumping...") 
pickle.dump(nw, nwfile) 
nwfile.close() 

print("saving into normalizers pickle...")
nrmlfile = open(normalize_pickle, 'wb')
pickle.dump(n, nrmlfile)
print("done, starting to train...") 
begin_train(nw_in, nw_out, c, n) 