In [1]:
import os,sys,time
import numpy as np
from scipy.io import wavfile
import sklearn

from IPython.display import Audio

import musiclib, database

%load_ext cython

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


# DTW

In [2]:
%%cython
import numpy as np
cimport numpy as np
cimport cython
from libc.math cimport sqrt

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef align(float[:,:] sig1,float[:,:] sig2):
    cdef int d = sig1.shape[1]
    cdef int len1 = sig1.shape[0]
    cdef int len2 = sig2.shape[0]
    cdef np.ndarray[np.float32_t, ndim=2] npL = np.empty((len1,len2), dtype=np.float32)
    cdef np.ndarray[np.float32_t, ndim=2] npP = np.empty((len1,len2), dtype=np.float32)
    
    cdef float[:,:] L = npL
    cdef float[:,:] P = npP
    
    cdef float cost,tmp
    cdef int j,k,i
    for j in range(0,len1):
        for k in range(0,len2):
            cost = 0
            for i in range(d):
                tmp = sig1[j,i] - sig2[k,i]
                cost += tmp * tmp
            cost = sqrt(cost)
            
            if j == 0 and k == 0:
                L[j,k] = cost
                P[j,k] = 3
            elif k == 0:
                L[j,k] = cost + L[j-1,k]
                P[j,k] = 2
            elif j == 0:
                L[j,k] = cost + L[j,k-1]
                P[j,k] = 1
            else: # j, k > 0
                if L[j-1,k] < L[j,k-1] and L[j-1,k] < L[j-1,k-1]: # insertion (up)
                    P[j,k] = 1
                    L[j,k] = cost + L[j-1,k]
                elif L[j,k-1] < L[j-1,k-1]: # deletion (left)
                    P[j,k] = 2
                    L[j,k] = cost + L[j,k-1]
                else: # match (up left)
                    P[j,k] = 3
                    L[j,k] = cost + L[j-1,k-1]
    
    return npL,npP

In [3]:
%%cython
import numpy as np
cimport numpy as np
cimport cython
from libc.math cimport sqrt

def traceback_loss(float[:,:] sig1,float[:,:] sig2, float[:,:] L):
    sig12 = np.zeros(sig2.shape) # align 1 onto 2
    cdef int j = sig1.shape[0]-1
    cdef int k = sig2.shape[0]-1
    A = []
    C = []
    cdef float cost,tmp
    while True:
        if j == 0 and k == 0:
            A.append((0,0))
            C.append(L[0,0])
            break # got back to the beginning
        
        cost = 0
        for i in range(sig1.shape[1]):
            tmp = sig1[j,i] - sig2[k,i]
            cost += tmp * tmp
        cost = sqrt(cost)
        
        if j>0 and k>0 and L[j,k] == L[j-1,k-1] + cost: # progress
            A.append((j,k))
            C.append(L[j,k])
            j -= 1
            k -= 1
        elif k>0 and L[j,k] == L[j,k-1] + cost: # stay sig2
            A.append((j,k))
            C.append(L[j,k])
            k -= 1
        elif j>0 and L[j,k] == L[j-1,k] + cost: # stay sig1
            A.append((j,k))
            C.append(L[j,k])
            j -= 1
        else: 
#             print 'j',j
#             print 'k',k
#             print 'cost',cost
#             print 'L[j,k]',L[j,k]
#             print 'L[j,k]-cost',L[j,k]-cost
#             print 'L[j,k-1]',L[j,k-1]
#             print 'L[j-1,k]',L[j-1,k]
#             print 'L[j-1,k-1]',L[j-1,k-1]
            assert False
    
    return list(reversed(A)),list(reversed(C))

In [4]:
window_size=2048
stride=512
cutoff=int(50*(window_size/2048.))

In [5]:
record = 'MIDI-Unprocessed_R1_D2-13-20_mid--AUDIO-from_mp3_16_R1_2015_wav--1.wav'
synth = 'wtc1p19.wav'

fs, data1 = wavfile.read(record)
left_pad = 1*fs
right_pad = 0*fs
data1 = data1[0:int(9.8*fs)]
data1 = np.concatenate((np.zeros((left_pad,2)),data1),axis=0)
frep1 = database.featurize(data1,fs,musiclib.feature,window_size,stride=stride,normalize=False)

fs, data2 = wavfile.read(synth)
data2 = data2[0:fs*10]
data2 = np.concatenate((np.zeros((left_pad,2)),data2),axis=0)
frep2 = database.featurize(data2,fs,musiclib.feature,window_size,stride=stride,normalize=False)



In [6]:
Audio(data1[:,0],rate=fs)

In [7]:
Audio(data2[:,0],rate=fs)

In [8]:
# Align signals
start = time.time()
L,P = align(frep1[0:cutoff].T.astype(np.float32),frep2[0:cutoff].T.astype(np.float32))
end = time.time()
print('Elapsed time: ' + str(end - start))

Elapsed time: 0.08806896209716797


In [9]:
# Find optimal path
path,costs = traceback_loss(frep1[0:cutoff].T.astype(np.float32),frep2[0:cutoff].T.astype(np.float32),L)

In [10]:
path1 = np.array([x[0] for x in path])
path2 = np.array([x[1] for x in path])

In [11]:
# Find corresponding onsets on the performance
notes_onsets_offsets_2 = musiclib.load_midi('wtc1p19.mid')
for i, e in list(enumerate(notes_onsets_offsets_2)):
    notes_onsets_offsets_2[i] = (e[0], (e[1]*fs + left_pad)/stride, (e[2]*fs + left_pad)/stride)

notes_onsets_offsets_1 = []    
for note, onset, offset in notes_onsets_offsets_2:
    if (onset > path2[-1] - right_pad/stride) or (offset > path2[-1] - right_pad/stride):
        break
    sig2_onset_index = np.argmax(path2>=onset)
    sig2_offset_index = np.argmax(path2>=offset)
    sig1_onset = path1[sig2_onset_index]
    sig1_offset = path1[sig2_offset_index]
    notes_onsets_offsets_1.append((note, sig1_onset, sig1_offset))
notes_onsets_offsets_1 = np.array(notes_onsets_offsets_1)


print(notes_onsets_offsets_1)

length of midi file69.2797896166662
[[ 45 175 223]
 [ 69 175 188]
 [ 71 188 201]
 [ 73 201 212]
 [ 69 212 223]
 [ 78 223 249]
 [ 57 223 280]
 [ 78 249 280]
 [ 56 280 338]
 [ 78 280 293]
 [ 75 293 308]
 [ 76 308 338]
 [ 55 338 393]
 [ 76 351 363]
 [ 78 363 376]
 [ 64 363 418]
 [ 79 376 393]
 [ 54 393 448]
 [ 81 393 404]
 [ 79 404 418]
 [ 57 418 448]
 [ 78 418 433]
 [ 81 433 448]
 [ 59 448 500]
 [ 74 448 462]
 [ 73 462 473]
 [ 62 473 526]
 [ 71 473 486]
 [ 81 486 500]
 [ 52 500 552]
 [ 80 500 513]
 [ 78 513 526]
 [ 56 526 552]
 [ 76 526 539]
 [ 80 539 552]
 [ 73 552 567]
 [ 57 552 608]
 [ 71 567 580]
 [ 69 580 593]
 [ 61 580 634]
 [ 79 593 608]
 [ 78 608 621]
 [ 50 608 662]
 [ 76 621 634]
 [ 74 634 648]
 [ 59 634 648]
 [ 57 648 662]
 [ 78 648 662]
 [ 56 662 702]
 [ 52 662 718]
 [ 71 662 672]
 [ 78 672 685]
 [ 76 685 702]
 [ 74 702 718]
 [ 56 702 718]
 [ 73 718 729]
 [ 57 718 743]
 [ 45 718 743]
 [ 71 729 743]
 [ 57 743 766]
 [ 69 743 752]
 [ 71 752 766]
 [ 56 766 793]
 [ 73 766 781]
 [ 7

In [12]:
# Find the correct onsets from the maestro midi file wtc1p19.mid
notes_onsets_offsets_correct = musiclib.load_midi('MIDI-Unprocessed_R1_D2-13-20_mid--AUDIO-from_mp3_16_R1_2015_wav--1.midi')

for i, e in list(enumerate(notes_onsets_offsets_correct)):
    notes_onsets_offsets_correct[i] = (e[0], (e[1]*fs + left_pad)/stride, (e[2]*fs + left_pad)/stride)

notes_onsets_offsets_correct_temp = []

for note, onset, offset in notes_onsets_offsets_correct:
    if (onset > path2[-1] - right_pad/stride) or (offset > path2[-1] - right_pad/stride):
        break
    notes_onsets_offsets_correct_temp.append((note, onset, offset))
notes_onsets_offsets_correct = np.array(notes_onsets_offsets_correct_temp)

print(notes_onsets_offsets_correct)

length of midi file212.3843750000017
[[ 69.         173.52172852 183.83972168]
 [ 45.         174.68811035 203.84765625]
 [ 71.         185.72387695 194.60632324]
 [ 73.         198.73352051 205.55236816]
 [ 69.         210.66650391 220.26672363]
 [ 57.         224.66308594 277.77832031]
 [ 78.         225.2911377  232.64831543]
 [ 78.         253.1048584  259.47509766]
 [ 56.         282.08496094 335.11047363]
 [ 78.         282.08496094 296.17126465]
 [ 75.         294.73571777 302.72094727]
 [ 76.         308.64257812 339.6862793 ]
 [ 55.         336.63574219 390.0201416 ]
 [ 76.         350.99121094 360.68115234]
 [ 78.         364.44946289 372.88330078]
 [ 64.         364.71862793 416.66748047]
 [ 79.         377.45910645 386.0723877 ]
 [ 54.         391.72485352 444.12231445]
 [ 81.         391.90429688 407.06726074]
 [ 79.         405.99060059 419.6282959 ]
 [ 57.         418.91052246 423.5760498 ]
 [ 78.         418.91052246 430.03601074]
 [ 81.         431.83044434 440.9820556

In [13]:
# Lets see how a perfect alignment would sound. You will here both onsets and offsets(offsets are hard to hear).
new_notes_onsets_offsets = notes_onsets_offsets_correct.copy()
for i, e in list(enumerate(new_notes_onsets_offsets)):
    new_notes_onsets_offsets[i] = (e[0], e[1]*stride, e[2]*stride)
out2 = musiclib.mark_notes_with_offsets(data1[:,0], new_notes_onsets_offsets)
wavfile.write('test.wav',fs,(.3*out2 + .7*data1[:,0]).astype(np.int16))
Audio(0.3*out2 + 0.7*data1[:,0],rate=fs)

In [14]:
# Now lets see how our alignment sounds. You will here both onsets and offsets(offsets are hard to hear).
new_notes_onsets_offsets = notes_onsets_offsets_1.copy()
for i, e in list(enumerate(new_notes_onsets_offsets)):
    new_notes_onsets_offsets[i] = (e[0], e[1]*stride, e[2]*stride)
out2 = musiclib.mark_notes_with_offsets(data1[:,0], new_notes_onsets_offsets)
wavfile.write('test.wav',fs,(.3*out2 + .7*data1[:,0]).astype(np.int16))
Audio(0.3*out2 + 0.7*data1[:,0],rate=fs)

# Evaluation

In [15]:
x = np.zeros((data1[:,0].shape[0], 128), dtype=bool)
y = np.zeros((data1[:,0].shape[0], 128), dtype=bool)

# Discretize maestro 
for note, onset, offset in notes_onsets_offsets_correct:
    onset = int(onset*stride)
    offset = int(offset*stride)
    for i in range(onset, offset):
        if (onset > x.shape[0] or offset > x.shape[0]):
            print("onset : " + str(onset) + " offset " + str(offset))
            break
        else:
            x[i][int(note)] = True  
        
# Discretize aligned midi         
for note, onset, offset in notes_onsets_offsets_1:
    onset = int(onset*stride)
    offset = int(offset*stride)
    for i in range(onset, offset):
        if (onset > y.shape[0] or offset > y.shape[0]):
            print("onset : " + str(onset) + " offset " + str(offset))
            break
        y[i][int(note)] = True

# Error conditions :
# 1) Something is playing in maestro and nothing is playing in alignment
# 2) Something is playing in alignment and nothing is playing in maestro
# This can be efficiently calculated using xor.
z = np.logical_xor(x, y)
print("Alignment Error: ", np.sum(z))

onset : 475499 offset 479863
onset : 475820 offset 481470
Alignment Error:  301379


## Miscellaneous evaluators

In [None]:
# onsets_correct : The onsets from the maestro midi file
# onsets_predicted : The onsets generated from alignment
def evaluate_alignment(onsets_correct, onsets_predicted):
    score = 0
    for correct_onset in onsets_correct:
        found = False
        for predicted_onset in onsets_predicted:
            diff = abs(predicted_onset - correct_onset)
            if diff < 4:
                score = score + 1
                found = True
                break
        if found == False:
            print('incorrect onset : ' + str(correct_onset))
    return score

In [None]:
evaluate_alignment(onsets_correct, onsets_sig1)