In [30]:
import numpy as np
import random, networkx as nx
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import LSTM
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.utils import to_categorical
# load ascii text and covert to lowercase
# create mapping of unique chars to integers



def kernPause(a1,a2):
    return  1*(a1==a2)

def kernDuration(d1,d2):
    return min(d1+1,d2+1)/max(d1+1,d2+1)

def kernVolume(v1,v2):
    return min(v1+1,v2+1)/max(v1+1,v2+1)


def kernTempo(t1,t2):
    return min(t1+1,t2+1)/max(t1+1,t2+1)

def getlowestfraction(x0):
    eps = 0.01

    x = np.abs(x0)
    a = np.floor(x)
    h1 = 1
    k1 = 0
    h = a
    k = 1

    while np.abs(x0-h/k)/np.abs(x0)> eps:
        x = 1/(x-a)
        a = np.floor(x)
        h2 = h1
        h1 = h
        k2 = k1
        k1 = k
        h = h2 + a*h1
        k = k2 + a*k1
    q = {"numerator": h, "denominator" :k}
    return q



def getRational(k):
    x = 2**(k*(1/12.0))
    return getlowestfraction(x)

def gcd(a,b):
    a = abs(a)
    b = abs(b)
    if (b > a):
        temp = a
        a = b
        b = temp
    while True:
        if (b == 0):
            return a
        a %= b
        if (a == 0):
            return b
        b %= a

def kernPitch(k1,k2):
    q = getRational(k2-k1)
    a,b = q["numerator"],q["denominator"]
    return gcd(a,b)**2/(a*b)

def parse_file(xml):
    import music21 as m21
    import numpy as np
    xml_data = m21.converter.parse(xml)
    score = []
    instruments = []
    for part in xml_data.parts:
        parts = []
        instruments.append(part.getInstrument())
        for note in part.recurse().notesAndRests:
            if note.isRest:
                start = note.offset
                duration = float(note.quarterLength)/4.0
                vol = 32 #note.volume.velocity
                pitch = 60
                parts.append((note,[(pitch,duration,vol,True)]))
            elif note.isChord:
                chord = []
                vols = ([n.volume.velocity for n in note if not n.volume.velocity is None])
                if len(vols)==0:
                    vol = 64
                else:
                    vol = min(vols)
                for n in note:
                    start = n.offset
                    duration = float(note.quarterLength)/4.0
                    pitch = n.pitch.midi
                    #print(pitch,duration,note.volume)
                    chord.append((pitch,duration,vol,False))
                parts.append((note,chord))   
                if vol is None:
                    vol = int(64)
                parts.append((note,[(pitch,duration,vol,False)]))    
            else:
                #print(note)
                start = note.offset
                duration = float(note.quarterLength)/4.0
                pitch = note.pitch.midi
                #print(pitch,duration,note.volume)
                vol = note.volume.velocity
                if vol is None:
                    vol = int(note.volume.realized * 127)
                parts.append((note,[(pitch,duration,vol,False)]))    
        score.append(parts)        
    return score,instruments




def getCoordinatesOf(intList,kernel=kernPitch,nDim=None):
    M0 = np.array([[kernel(t1,t2) for t1 in intList] for t2 in intList])
    #print(M0)
    from sklearn.decomposition import PCA
    from sklearn.decomposition import KernelPCA
    from sklearn.preprocessing import MinMaxScaler,StandardScaler
    scaler = StandardScaler() #MinMaxScaler((0,1))
    KPCA = KernelPCA(n_components=nDim,kernel='precomputed',eigen_solver='randomized')
    
    Ch0 = KPCA.fit_transform((M0))
    #print(Ch0)
    X0 = [x for x in 1.0*Ch0]    
    
    #print(X0)
        
    #X0 = scaler.fit_transform(X0)
    
    #invPitchDict = dict(zip(intList,range(len(intList))))
    return Ch0#, invPitchDict

def getDict(typ="pitch",mylist=[]):
    ss = list(sorted(set(mylist)))
    if typ=="pitch":
        kern = kernPitch
    if typ == "duration":
        kern = kernDuration
    if typ == "volume":
        kern = kernVolume
    if typ == "rest":
        kern = kernPause
    cholPitch = getCoordinatesOf(ss,kernel=kern,nDim=None)    
    pitchDict = dict(zip(range(len(ss)),cholPitch))
    return ss,(pitchDict)

def getDictOld(typ="pitch",mylist=[]):
    ss = list(sorted(set(mylist)))
    if typ=="pitch":
        kern = kernPitch
    if typ == "duration":
        kern = kernDuration
    if typ == "volume":
        kern = kernVolume
    if typ == "rest":
        kern = kernPause
    gramPitch = np.array([[kern(p1,p2) for p1 in ss] for p2 in ss])
    cholPitch = np.linalg.cholesky(gramPitch)
    from sklearn.decomposition import PCA
    pca = PCA(n_components=2)
    cholPitch = pca.fit_transform(cholPitch)
    pitchDict = dict(zip(range(len(ss)),cholPitch.tolist()))
    return ss,(pitchDict)

def prepareXYData(pitchList,pitches,pitchDict,seq_length):
    pitch_to_int = dict((c, i) for i, c in enumerate(pitchList))
    # summarize the loaded data
    n_chars = len(pitches)
    print("Total Characters: ", n_chars)
    # prepare the dataset of input to output pairs encoded as integers
    dataX = []
    dataY = []
    for i in range(0, n_chars - seq_length, 1):
	    seq_in = pitches[i:i + seq_length]
	    seq_out = pitches[i + seq_length]
	    dataX.append([pitchDict[pitchList.index(pitch)] for pitch in seq_in])
	    dataY.append(pitch_to_int[seq_out])
    n_patterns = len(dataX)
    print("Total Patterns: ", n_patterns)
    X = np.array(dataX)

    # one hot encode the output variable
    y = to_categorical(dataY)
    return X,y


def fitModel(X,y,prefix="pitches-",dim=256,epochs=20,batch_size=128,return_model_if_exists=True,load_model_if_exists=True):
    model = Sequential()
    print("X.shape = ",X.shape)
    model.add(LSTM(dim, input_shape=(X.shape[1],X.shape[2])))
    model.add(Dropout(0.2))
    model.add(Dense(y.shape[1], activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    # define the checkpoint
    filepath="./checkpoints/"+prefix+"weights-improvement.hdf5"
    # if checkpoints exist, load the weights into the model:
    from pathlib import Path
    myfile = Path(filepath)
    if myfile.is_file() and load_model_if_exists:
        model.load_weights(filepath)
        print(prefix,"loaded")
        if return_model_if_exists:
            return model
    checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=1, save_best_only=True, mode='min')
    callbacks_list = [checkpoint]
    # fit the model
    model.fit(X, y, epochs=epochs, batch_size=batch_size, callbacks=callbacks_list)
    return model

def writePitches(fn,inds,tempo=82,instrument=[0,0],add21=True,start_at= [0,0],durationsInQuarterNotes=False):
    from MidiFile import MIDIFile

    track    = 0
    channel  = 0
    time     = 0   # In beats
    duration = 1   # In beats # In BPM
    volume   = 116 # 0-127, as per the MIDI standard

    ni = len(inds)
    MyMIDI = MIDIFile(ni,adjust_origin=False) # One track, defaults to format 1 (tempo track
                     # automatically created)
    MyMIDI.addTempo(track,time, tempo)


    for k in range(ni):
        MyMIDI.addProgramChange(k,k,0,instrument[k])


    times = start_at
    for k in range(len(inds)):
        channel = k
        track = k
        for i in range(len(inds[k])):
            pitch,duration,volume,isPause,tempo = inds[k][i]
            #print(pitch,duration,volume,isPause)
            track = k
            channel = k
            if not durationsInQuarterNotes:
                duration = 4*duration#*maxDurations[k] #findNearestDuration(duration*12*4)            
            #print(k,pitch,times[k],duration,100)
            if not isPause: #rest
                #print(volumes[i])
                # because of median:
                pitch = int(np.floor(pitch))
                if add21:
                    pitch += 21
                #print(pitch,times[k],duration,volume,isPause)    
                MyMIDI.addTempo(track,times[k], tempo)
                #print("track, channel = ",track,channel)
                MyMIDI.addNote(track, channel, int(pitch), float(times[k]) , float(duration), int(volume))
                times[k] += duration*1.0  
            else:
                times[k] += duration*1.0
       
    with open(fn, "wb") as output_file:
        MyMIDI.writeFile(output_file)
    print("written")  

def sublist(sublst, lst): 
    return sum([sublst == lst[i: i + len(sublst)] for i in range(len(lst) - len(sublst)+1)]) > 0
    
    
def generate(model,startpitches,length,duration_in_min,tempo,pitchList,pitchDict,seq_length,random_proportion):
    tenPercent = min(1,int(seq_length*random_proportion))
    if not length is None:
        for k in range(length):
            dataX = []
            lastpitches = startpitches[-seq_length:]
            if sublist(lastpitches,startpitches[1:]):
                for tp in range(tenPercent):
                    lastpitches[tp] = np.random.choice(startpitches,1)[0]
            dataX.append([pitchDict[pitchList.index(pitch)] for pitch in lastpitches])
            xvec = np.array(dataX)
            result = pitchList[np.argmax(model.predict(xvec))]
            startpitches.append(result)
        return startpitches[seq_length:]    
    elif (not (duration_in_min is None)) and  (not (tempo is None)):
        sumdur = 0
        while sumdur < duration_in_min*tempo:
            dataX = []
            lastpitches = startpitches[-seq_length:]
            if sublist(lastpitches,startpitches[1:]):
                
                lastpitches[0] = np.random.choice(startpitches,1)[0]
            dataX.append([pitchDict[pitchList.index(pitch)] for pitch in lastpitches])
            xvec = np.array(dataX)
            result = pitchList[np.argmax(model.predict(xvec))]
            startpitches.append(result)
            sumdur = sumdur+result*4 # result is a duration in quarter-beats 
        return startpitches[seq_length:]    


def generateNotesFromPartInScore(prefix_for_checkpoint,part,partNr,tempo,seq_length,length,duration_in_min,dim,epochs,batch_size,return_model_if_exists,load_model_if_exists,random_proportion):
    pitches = []
    durations = []
    volumes = []
    pauses = []
    
    pitches.extend([x[1][0][0] for x in part])
    durations.extend([x[1][0][1] for x in part])
    volumes.extend([x[1][0][2] for x in part])
    pauses.extend([x[1][0][3] for x in part])
    
    pitchList,pitchDict = getDict("pitch",pitches)
    durationList,durationDict = getDict("duration",durations)
    volumeList,volumeDict = getDict("volume",volumes)
    pauseList,pauseDict = getDict("rest",pauses)
    
    XPitches,yPitches = prepareXYData(pitchList,pitches,pitchDict,seq_length)
    XDurations,yDurations = prepareXYData(durationList,durations,durationDict,seq_length)
    XVolumes,yVolumes = prepareXYData(volumeList,volumes,volumeDict,seq_length)
    XPauses,yPauses = prepareXYData(pauseList,pauses,pauseDict,seq_length)
    
    modelPitches = fitModel(XPitches,yPitches,prefix_for_checkpoint+"pitches-"+str(partNr)+"-",dim,epochs,batch_size,return_model_if_exists,load_model_if_exists)
    modelDurations = fitModel(XDurations,yDurations,prefix_for_checkpoint+"durations-"+str(partNr)+"-",dim,epochs,batch_size,return_model_if_exists,load_model_if_exists)
    modelVolumes = fitModel(XVolumes,yVolumes,prefix_for_checkpoint+"volumes-"+str(partNr)+"-",dim,epochs,batch_size,return_model_if_exists,load_model_if_exists)
    modelPauses = fitModel(XPauses,yPauses,prefix_for_checkpoint+"pauses-"+str(partNr)+"-",dim,epochs,batch_size,return_model_if_exists,load_model_if_exists)
    
    r = np.random.choice(range(len(durations)-seq_length),1)[0]
    new_durations = generate(modelDurations,durations[r:r+seq_length],None,duration_in_min,tempo,durationList,durationDict,seq_length,random_proportion)    
    
    length = len(new_durations)
    new_pitches = generate(modelPitches,pitches[r:r+seq_length],length,None,tempo,pitchList,pitchDict,seq_length,random_proportion)    
    new_volumes = generate(modelVolumes,volumes[r:r+seq_length],length,None,tempo,volumeList,volumeDict,seq_length,random_proportion)    
    new_pauses = generate(modelPauses,pauses[r:r+seq_length],length,None,tempo,pauseList,pauseDict,seq_length,random_proportion)    
    
    models = modelPitches,modelDurations,modelVolumes,modelPauses
    
    notes = []
    for i in range(length):
        pitch = new_pitches[i]
        duration = new_durations[i]
        volume = new_volumes[i]
        rest = new_pauses[i]
        notes.append((pitch,duration,volume,rest,tempo))
    return notes,models    


def writePickleBz2(fn,data):
    import bz2,pickle
    ofile = bz2.BZ2File(fn,'wb')
    pickle.dump(data,ofile)
    ofile.close()    
    
def readPickleBz2(fn):
    import bz2, pickle
    ifile = bz2.BZ2File(fn,'rb')
    newdata = pickle.load(ifile)
    ifile.close()
    return newdata

In [40]:
import zipfile

# zip file handler  
zipf = zipfile.ZipFile('./midi/classical_piano.zip')

# list available files in the container
filelist = sorted([z for z in zipf.namelist() if z.endswith(".mid") and ("ty_januar" in z or "ty_februar" in z)])
#np.random.shuffle(filelist)
maxFn = 100


# extract a specific file from the zip container
#f = zip.open("file_inside_zip.txt")

# save the extraced file 
#content = f.read()
# only piano
leftpart = []
rightpart =  []
c = 0
for fn in filelist:
    if c>=maxFn:
        continue
    f = zipf.open(fn)
    content = f.read()
    f.close()
    with open("./midi/tmp.mid","wb") as g:
        g.write(content)  
    score,inst = parse_file("./midi/tmp.mid")
    if len(score)==2 and c < maxFn:
        print(fn,inst)
        leftpart.extend(score[0])
        rightpart.extend(score[1])
        c+=1
#print(score)

score = [leftpart,rightpart]

writePickleBz2("./data/traindata-tschai-jan-feb.pkl.bz2",score)



tschai/ty_februar.mid [<music21.instrument.Piano 'Piano right: Piano right'>, <music21.instrument.Piano 'Piano left: Piano left'>]
tschai/ty_januar.mid [<music21.instrument.Piano 'Piano right: Piano right'>, <music21.instrument.Piano 'Piano left: Piano left'>]


In [44]:
#score = readPickleBz2("./data/traindata-beethoven-elise-mond_3.pkl.bz2")
score = readPickleBz2("./data/traindata-tschai-jan-feb.pkl.bz2")
tempo = 80
iinds = []
mmodels = []
rmie = [True,True]
lmie = [True,True]
seq_length = 100
dim = 512
batch_size = 256
epochs = 50
duration_in_min = 4
random_proportion = 0.25
prefix_for_checkpoint = "tschai-jan-febr-seq_length="+str(seq_length)+"-dim="+str(dim)+"-batch_size="+str(batch_size)+"-"
#prefix_for_checkpoint = ""
for i in range(len(score)):
    part = score[i]
    notes,models = generateNotesFromPartInScore(prefix_for_checkpoint=prefix_for_checkpoint,part=part,partNr=i,tempo=tempo,seq_length=seq_length,length=500,duration_in_min=duration_in_min,dim=dim,epochs=epochs,batch_size=batch_size,return_model_if_exists=rmie[i],load_model_if_exists=lmie[i],random_proportion=random_proportion)
    mmodels.append(models)
    #notes = generateNotesFromPartInScore(part,partNr=i,tempo=tempo,seq_length=30,length=1000,dim=512,epochs=2,batch_size=256,return_model_if_exists=True)
    iinds.append(notes)

fn = "./midi/tschai-jan-febr_with_lstm_11.mid"    
writePitches(fn,iinds,tempo=tempo,instrument=[0,0],add21=False,start_at= len(iinds)*[0],durationsInQuarterNotes=False)        


Total Characters:  2152
Total Patterns:  2052
Total Characters:  2152
Total Patterns:  2052
Total Characters:  2152
Total Patterns:  2052
Total Characters:  2152
Total Patterns:  2052
X.shape =  (2052, 100, 50)
tschai-jan-febr-seq_length=100-dim=512-batch_size=256-pitches-0- loaded
X.shape =  (2052, 100, 23)
tschai-jan-febr-seq_length=100-dim=512-batch_size=256-durations-0- loaded
X.shape =  (2052, 100, 66)
tschai-jan-febr-seq_length=100-dim=512-batch_size=256-volumes-0- loaded
X.shape =  (2052, 100, 1)
tschai-jan-febr-seq_length=100-dim=512-batch_size=256-pauses-0- loaded








Total Characters:  1636
Total Patterns:  1536
Total Characters:  1636
Total Patterns:  1536
Total Characters:  1636
Total Patterns:  1536
Total Characters:  1636
Total Patterns:  1536
X.shape =  (1536, 100, 45)
tschai-jan-febr-seq_length=100-dim=512-batch_size=256-pitches-1- loaded
X.shape =  (1536, 100, 28)
tschai-jan-febr-seq_length=100-dim=512-batch_size=256-durations-1- loaded
X.shape =  (1536, 100, 61)
tschai-jan-febr-seq_length=100-dim=512-batch_size=256-volumes-1- loaded
X.shape =  (1536, 100, 1)
tschai-jan-febr-seq_length=100-dim=512-batch_size=256-pauses-1- loaded






























written


In [8]:
score = readPickleBz2("./data/traindata-beethoven.pkl.bz2")
tempo = 80
iinds = []
mmodels = []
rmie = [False,False]
lmie = [True,True]
seq_length = 100
dim = 512
batch_size = 256
epochs = 3
duration_in_min = 4
prefix_for_checkpoint = "elise-seq_length="+str(seq_length)+"-dim="+str(dim)+"-batch_size="+str(batch_size)+"-"
for i in range(len(score)):
    part = score[i]
    notes,models = generateNotesFromPartInScore(prefix_for_checkpoint=prefix_for_checkpoint,part=part,partNr=i,tempo=tempo,seq_length=seq_length,length=500,duration_in_min=duration_in_min,dim=dim,epochs=epochs,batch_size=batch_size,return_model_if_exists=rmie[i],load_model_if_exists=lmie[i])
    mmodels.append(models)
    #notes = generateNotesFromPartInScore(part,partNr=i,tempo=tempo,seq_length=30,length=1000,dim=512,epochs=2,batch_size=256,return_model_if_exists=True)
    iinds.append(notes)

fn = "./midi/for_elise_with_lstm_20.mid"    
writePitches(fn,iinds,tempo=tempo,instrument=[0,0],add21=False,start_at= len(iinds)*[0],durationsInQuarterNotes=False)        


Total Characters:  700
Total Patterns:  600
Total Characters:  700
Total Patterns:  600
Total Characters:  700
Total Patterns:  600
Total Characters:  700
Total Patterns:  600
X.shape =  (600, 100, 37)
elise-seq_length=100-dim=512-batch_size=256-pitches-0- loaded
Epoch 1/3
Epoch 1: loss improved from inf to 0.36478, saving model to ./checkpoints/elise-seq_length=100-dim=512-batch_size=256-pitches-0-weights-improvement.hdf5
Epoch 2/3
Epoch 2: loss improved from 0.36478 to 0.27652, saving model to ./checkpoints/elise-seq_length=100-dim=512-batch_size=256-pitches-0-weights-improvement.hdf5
Epoch 3/3
Epoch 3: loss improved from 0.27652 to 0.22309, saving model to ./checkpoints/elise-seq_length=100-dim=512-batch_size=256-pitches-0-weights-improvement.hdf5
X.shape =  (600, 100, 15)
elise-seq_length=100-dim=512-batch_size=256-durations-0- loaded
Epoch 1/3
Epoch 1: loss improved from inf to 0.84780, saving model to ./checkpoints/elise-seq_length=100-dim=512-batch_size=256-durations-0-weights-i































Total Characters:  512
Total Patterns:  412
Total Characters:  512
Total Patterns:  412
Total Characters:  512
Total Patterns:  412
Total Characters:  512
Total Patterns:  412
X.shape =  (412, 100, 24)
elise-seq_length=100-dim=512-batch_size=256-pitches-1- loaded
Epoch 1/3
Epoch 1: loss improved from inf to 0.43598, saving model to ./checkpoints/elise-seq_length=100-dim=512-batch_size=256-pitches-1-weights-improvement.hdf5
Epoch 2/3
Epoch 2: loss did not improve from 0.43598
Epoch 3/3
Epoch 3: loss improved from 0.43598 to 0.38872, saving model to ./checkpoints/elise-seq_length=100-dim=512-batch_size=256-pitches-1-weights-improvement.hdf5
X.shape =  (412, 100, 8)
elise-seq_length=100-dim=512-batch_size=256-durations-1- loaded
Epoch 1/3
Epoch 1: loss improved from inf to 0.39023, saving model to ./checkpoints/elise-seq_length=100-dim=512-batch_size=256-durations-1-weights-improvement.hdf5
Epoch 2/3
Epoch 2: loss improved from 0.39023 to 0.37830, saving model to ./checkpoints/elise-seq_l





















written


In [18]:
def writeM21Lists(fn,inds,tempo,instruments,title,author,fileType):
    from music21 import chord
    from music21 import stream
    from music21 import duration
    from music21 import clef, metadata
    import music21 as m
    import copy

    score = stream.Score()
    score.insert(0, metadata.Metadata())
    score.metadata.title = title
    score.metadata.composer = author
    tm = m.tempo.MetronomeMark(number=tempo)
    score.append(tm)
    lh = m.stream.Part()
    lh.append(m.instrument.Piano())
    rh = m.stream.Part()
    rh.append(m.instrument.Piano()) #Violin())
    
    def extendPart(part,ll):
        for l in ll:
            part.append(l)
        return part
    
    ourparts = []    
    for i in range(len(inds)):
        mypart = m.stream.Part()
        mypart.append(instruments[i])
        mypart = extendPart(mypart, inds[i])
        ourparts.append(mypart)
    
    for part in ourparts:
        score.append(part)
    if fileType == "musicxml":    
        score.write("musicxml",fp=fn) 
    elif fileType == "mid":    
        score.write("mid",fp=fn) 
    print("written")    
import music21 as m21
fn = "./midi/beethoven_with_lstm_5.musicxml"    
tempo = 80
instruments = 2*[m21.instrument.Piano()]
title = "Music with LSTM and positive definite kernels"
author = "musescore1983"
fileType = "musicxml"
inds = []
for notes in iinds:
    mnotes = []
    for note in notes:
        pitch, duration, volume, pause,tempo = note
        P = m21.pitch.Pitch(pitch)
        D = m21.duration.Duration(duration*4)
        V = m21.volume.Volume(volume)
        if pause:
            nn = m21.note.Rest(duration=D)
        else:
            nn = m21.note.Note(pitch=P,duration=D,volume=V)
        mnotes.append(nn)    
    inds.append(mnotes) 
        
writeM21Lists(fn,inds,tempo,instruments,title,author,fileType)        

written
