In [38]:
from bs4 import BeautifulSoup as bs
from decimal import *
import sys
import os

duraDic = {
    'measure':None,
    'longa':Decimal(4),
    'breve':Decimal(2),
    'whole':Decimal(1),
    'half':Decimal(1)/2,
    'quarter':Decimal(1)/4,
    'eighth':Decimal(1)/8,
    '16th':Decimal(1)/16,
    '32nd':Decimal(1)/32,
    '64th':Decimal(1)/64,
    '128th':Decimal(1)/128
}

pianoSrc = {
    '0':'P_acoustic_grand',
    '1':'P_bright_acoustic',
    '2':'P_electric_grand',
    '3':'P_honky_tonk',
    '4':'P_electric1',
    '5':'P_electric2',
    '6':'P_harpsichord',
    '7':'P_clavichord',
    '8':'P_celesta'
}

pianoFamily = ('keyboard', 'piano', 'harpsichord', 'clavichord', 'celesta')

In [39]:
def main(dirPath):
    for filename in os.listdir(dirPath):
        print filename
        with open(dirPath+filename, 'r') as f:
            mscx = bs(f.read(), 'xml')
        #Continue to next file if the Division != 480
        if mscx.find('Division').text != '480':
            continue
        deterPianoStaffs(mscx, filename)

In [40]:
def getInstrumentData(part):
    staffId = []
    #Labeled instrument
    labInst = ''
    #Represented instrument
    repInst = ''
    #Source instrument
    srcInst = ''
    for tag in part.find_all(['Staff', 'longName', 'instrumentId', 'program']):
        if tag.name == 'Staff':
            staffId.append(tag['id'])
        elif tag.name == 'longName':
            labInst = tag.text
        elif tag.name == 'instrumentId':
            repInst = tag.text
        else: #tag.name == 'program'
            srcInst = tag['value']
            break #while reaching 1st <program> in <Channel>
    return staffId, labInst, repInst, srcInst

In [41]:
#Determine which staffs play piano
def deterPianoStaffs(mscx, filename):
    for part in mscx.find_all('Part'):
        
        #Get instrument data from metadata of mscx
        staffId, labInst, repInst, srcInst = getInstrumentData(part)
        
        #Pick piano staffs for details extraction
        if srcInst in pianoSrc and filter(lambda pf: pf in repInst.lower() or pf in labInst.lower(), pianoFamily) and 'bass' not in labInst.lower():
            if len(staffId) == 1:
                singleStaff = mscx.select_one('Score > Staff:nth-of-type('+staffId[0]+')')
                print '[=============== Staff #' + singleStaff['id'] + ' ===============]'
                singleStaffBeatsPerMeasure(singleStaff, pianoSrc[srcInst], filename)                
            else: #len(staffId) > 1
                multiStaff = map(lambda Id: mscx.select_one('Score > Staff:nth-of-type('+Id+')'), staffId)
                print '[=============== Multi-Staff' + reduce(lambda ini, Id: ini+' #'+Id, staffId, '') + ' ===============]'
                multiStaffBeatsPerMeasure(multiStaff, pianoSrc[srcInst], filename)                

In [42]:
def ifTimeSig(measure):
    sigN = Decimal(measure.find('sigN').text)
    sigD = Decimal(measure.find('sigD').text)
    duraDic['measure'] = sigN / sigD
    return 480 * sigN

In [43]:
#Only called by singleStaffBeatsPerMeasure()
def avgPitchLT48(measure, filename):
    if measure.find_all('pitch'):
        avgPitch = sum(map(lambda pitch: float(pitch.text), measure.find_all('pitch'))) / len(measure.find_all('pitch'))
        if avgPitch >= 48:
            return False
        else:
            with open('C:/Users/BigData/Desktop/PianoStaffBeatsError.log', 'a') as f:
                f.write('[' + filename + ']\n' + 'The average pitch of measure #' + measure['number'] + ' is ' + str(avgPitch) + ', < 48\n')
            return True
    else:
        return False

In [64]:
def ifTrackChanges(tag, isTrack, track_key, track_N_time_acc):
    ifBreak = False
    
    #Track changes from another to #0
    if not tag.find('track') and track_key != 'track0':
        isTrack = False
        track_key = 'track0'
        ifBreak = True if track_N_time_acc != duraDic['measure'] else False
        track_N_time_acc = 0

    #Track changes from #0 to another number
    elif tag.find('track') and track_key == 'track0':
        isTrack = True
        track_key = 'track' + tag.find('track').text

    #Track changes from a nonzero number to another nonzero number
    elif tag.find('track') and track_key != 'track'+tag.find('track').text:
        track_key = 'track' + tag.find('track').text
        ifBreak = True if track_N_time_acc != duraDic['measure'] else False
        track_N_time_acc = 0
        
    return isTrack, ifBreak, track_key, track_N_time_acc

In [45]:
def ifDots(tag):
    if not tag.find('dots'):
        multi = 1
    else:
        dots = tag.find('dots').text
        if dots == '1':
            multi = 1.5
        elif dots == '2':
            multi = 1.75
        elif dots == '3':
            multi = 1.875
    return Decimal(multi)

In [46]:
def buildTrackString(tag, multi, thisMeasure, track_key, reduceNotes=None):
    duTime = Decimal(duraDic[tag.select('durationType')[0].text]) * multi
    noteStr = '1' if tag.name == 'Chord' else '0'
    if reduceNotes == True:
        noteStr = reduce(lambda ini, pitch: ini+','+pitch.text, tag.find_all('pitch'), '1') if noteStr == '1' else '0'
    return duTime, thisMeasure[track_key] + str(duTime) + ',' + noteStr + ';'

In [47]:
def accuDura(isTrack, track_0_time_acc, track_N_time_acc, duTime, ticking, measureDivision):
    ifBreak = False
    if isTrack == False:
        track_0_time_acc += duTime
    else:
        track_N_time_acc += duTime
        if ticking is not None:
            if ticking % measureDivision == 0:
                ticking = None
            else:
                ifBreak = True
    return track_0_time_acc, track_N_time_acc, ticking, ifBreak

In [48]:
def ifTuplet(tag):
    tupNum = tag.select_one('Number > text').text
    not3579 = True if tupNum != '3' and tupNum != '5' and tupNum != '7' and tupNum != '9' else False
    tupID = tag['id']
    normalNotes = Decimal(tag.find('normalNotes').text)
    actualNotes = Decimal(tag.find('actualNotes').text)
    tupRatio = normalNotes / actualNotes
    return not3579, tupID, tupRatio

In [49]:
def duraError(track_0_time_acc, filename, measure):
    if abs(track_0_time_acc - duraDic['measure']) >= 0.000000001:
        with open('C:/Users/BigData/Desktop/PianoStaffBeatsError.log', 'a') as f:
            f.write('[' + filename + ']\n' + 'Continue to next measure due to track_0_time_acc of measure #' + measure['number'] + ' == ' + str(track_0_time_acc) + ', != ' + str(duraDic['measure']) + '\n')
        return True
    return False

In [50]:
#Only called by singleStaffBeatsPerMeasure()
def findDoubleNoteInMultiTrack(trackSign, measure, thisMeasure, filename):
    ifContinue = False
    if trackSign == 'multi_track' and measure.find('Chord'):
        for tag in measure.find_all('Chord'):
            if len(tag.find_all('Note')) > 1:
                if not tag.find('track'):
                    keptTrack = 'track0'
                else:
                    keptTrack = 'track' + tag.find('track').text
                keptTrackStr = thisMeasure[keptTrack]
                thisMeasure.clear()
                thisMeasure[keptTrack] = keptTrackStr
                break
        else:
            with open('C:/Users/BigData/Desktop/PianoStaffBeatsError.log', 'a') as f:
                f.write('[' + filename + ']\n' + '#Continue to next measure if all notes in this multi-track measure are single-note\n')
            ifContinue = True
    return thisMeasure, ifContinue

In [51]:
#Extract beats in every track of a measure
#All break statements are meant to continue the outer loop "for measure" from inner loop "for tag"
#Statement continue in the ELSE block of inner loop "for tag" affects the outer loop "for measure"
def singleStaffBeatsPerMeasure(singleStaff, srcPiano, filename):
    for measure in singleStaff.find_all('Measure'):
        
        #Determine the 1st time signature or modify it once it changes
        if measure.find('TimeSig'):
            measureDivision = ifTimeSig(measure)
        
        #Continue to next measure if the average pitch is less than 48
        if avgPitchLT48(measure, filename) == True:
            continue
        
        #Initialization
        track_0_time_acc = 0
        isTrack = False
        track_key = 'track0'
        track_N_time_acc = 0
        ticking = None
        thisMeasure = {track_key:''}
        for track in measure.find_all('track'):
            if thisMeasure.get('track'+track.text) is None:
                thisMeasure.update({'track'+track.text:''})
        trackSign = 'single_track' if len(thisMeasure) == 1 else 'multi_track'
        
        #Continue to next measure if all notes in a single-track measure are single-note
        #所有時間點的音為單音(==1) -> 沒有一個時間點的音不為單音(!=1) -> not len(chord.find_all('Note')) != 1
        if trackSign == 'single_track' and measure.find('Chord')\
        and not filter(lambda chord: len(chord.find_all('Note')) != 1, measure.find_all('Chord')):
            continue
            
        #Set recursive=False to avoid <Tuplet> inside <Chord> or <Rest>
        for tag in measure.find_all(['Rest','Chord','Tuplet','tick'], recursive=False):
            if tag.name == 'Chord' or tag.name == 'Rest':

                #Do something if the track number changes to another number
                #Continue to next measure if track_N_time_acc is not equivalent to measure duration
                isTrack, ifBreak, track_key, track_N_time_acc = ifTrackChanges(tag, isTrack, track_key, track_N_time_acc)
                if ifBreak:
                    break
                
                #For the duration of a note, modify the multiplier if it's with dots or within a tuplet
                multi = ifDots(tag)
                if tag.find('Tuplet') and tag.find('Tuplet').text == tupID and not tag.find('dots'):
                    multi *= tupRatio
                
                #Continue to next measure if this measure is with a dot found in a tuplet
                elif tag.find('Tuplet') and tag.find('Tuplet').text == tupID and tag.find('dots'):
                    break
                    
                #Build the string for every track
                duTime, thisMeasure[track_key] = buildTrackString(tag, multi, thisMeasure, track_key)
                
                #Accumulate note duration for checking special cases
                #Continue to next measure if a nonzero track doesn't start at head of this measure
                track_0_time_acc, track_N_time_acc, ticking, ifBreak = accuDura(isTrack, track_0_time_acc, track_N_time_acc, duTime, ticking, measureDivision)
                if ifBreak:
                    break
                            
            elif tag.name == 'Tuplet':
                #Continue to next measure if this measure is with a tuplet not of 3, 5, 7 or 9'
                not3579, tupID, tupRatio = ifTuplet(tag)
                if not3579:
                    break
            else: #tag.name == 'tick'
                ticking = int(tag.text)
        else:
            #Continue to next measure if the duration of the last nonzero track is shorter than the measure duration
            if trackSign == 'multi_track' and track_N_time_acc != duraDic['measure']:
                continue
                
            #Log if the accumulation of note duration doesn't match Time Signature
            if duraError(track_0_time_acc, filename, measure):
                continue
            
            #Trim the strings in the measure dictionary
            thisMeasure = {track:thisMeasure[track][:-1] for track in thisMeasure}
            thisMeasure.update({'duration':duraDic['measure'], 'filename':filename[:-5]})
            
            #In a 'multi-track' measure, find a track with a double-note or more and remove all the others from the measure dictionary
            #Continue to next measure if all notes in this 'multi-track' measure are single-note
            thisMeasure, ifContinue = findDoubleNoteInMultiTrack(trackSign, measure, thisMeasure, filename)
            if ifContinue:
                continue
            
            print {'single_staff':thisMeasure}

In [178]:
#Only called by multiStaffBeatsPerMeasure()
def getRoot(measureColumn):
    
    #Ruduce measureColumn like [{'t0':'1,0','t1':'0.5,0;0.5,1,58'},{'t0':'0.5,1,54,58;0.5,0','t2':'0.25,1,51;0.5,0;0.25,1,51,54,58'}]
    #to a 1 dimension list with only track-strings as ['1,0', '0.5,0;0.5,1,58', '0.25,1,51;0.5,0;0.25,1,51,54,58', '0.5,1,54,58;0.5,0']
    flatColumn = reduce(lambda l,d: l+reduce(lambda l,t: l+[d[t]], d, []), measureColumn, [])
    
    #Structure map(:filter(:,map(:reduce(),)),candidates)
    # 1) 以內部的map(reduce())為核心，以最單純的情況開始架構此function: 由1個track-string, 兩個for loops改寫，map傳track-string中一個個note-strings給reduce, 將每個元素第一個值duration改為之前所有元素duration總和，外層map再產出duration變更後的list
    # 2) 再往外filter過濾掉休止符的note-string元素，
    # 3) 再往外map傳入candidate(1-D track-string list), 產出過濾休止符後duration改為之前所有元素duration總和的list of lists of note-strings
    candidates = map(lambda s: filter(lambda z: z.split(',')[1]=='1', map(lambda x: str(reduce(lambda i,y: i+float(s.split(';')[y].split(',')[0]), range(x), 0))+','+s.split(';')[x].split(',',1)[1], range(len(s.split(';'))))), flatColumn)

    #Exclude empty list(due to measure rest) from previous list of lists of note-strings
    #Then, map select the 1st note-string from every list to generate a reduced list
    candidates = map(lambda v: v[0], filter(lambda w: w, candidates))

    #Exclude note-strings whose starting point is not mininum value
    candidates = filter(lambda x: float(x.split(',')[0])==min(map(lambda y: float(y.split(',')[0]), candidates)), candidates)

    #Get the lowest pitch as root. If candidates is empty, this is a rest-measure
    root = int(reduce(lambda i,j: i if int(i.split(',')[2])<int(j.split(',')[2]) else j, candidates).split(',')[2]) if candidates else None
    return root, flatColumn

In [190]:
#Only called by multiStaffBeatsPerMeasure()
def getTonality(root, flatColumn):
    relativePitch = map(lambda l: ';'.join(l), map(lambda t: map(lambda n: n.split(',')[0]+',1'+reduce(lambda i,p: i+','+str(int(p)-root), n.split(',',2)[2].split(','), '') if len(n.split(','))>2 else n, t.split(';')), flatColumn))
    filterRest = reduce(lambda l,d: l+d, map(lambda s: filter(lambda n: len(n.split(','))>2, s.split(';')), relativePitch), [])
    flatPitch = reduce(lambda l,p: l+p, map(lambda n: n.split(',')[2:], filterRest), [])
    P1 = 0
    M3 = 0
    m3 = 0
    P5 = 0
    count = 0
    for rp in flatPitch:
        if int(rp) % 12 == 0:
            P1 += 1
        elif int(rp)-4 % 12 == 0:
            M3 += 1
        elif int(rp)-3 % 12 == 0:
            m3 += 1
        elif int(rp)-7 % 12 == 0:
            P5 += 1
        count += 1
    majorRatio = float(P1+M3+P5) / count
    minorRatio = float(P1+m3+P5) / count    
    if majorRatio >= 0.75 and minorRatio < 0.75:
        tonality = 'major'
    elif majorRatio < 0.75 and minorRatio >= 0.75:
        tonality = 'minor'
    elif majorRatio >= 0.75 and minorRatio >= 0.75:
        tonality = 'major'
    else:
        tonality = None
    return tonality, relativePitch

In [189]:
#Extract beats in every track of a measure
#All break statements are meant to continue the outer loop "for measure" from inner loop "for tag"
#Statement continue in the ELSE block of inner loop "for tag" affects the outer loop "for measure"
def multiStaffBeatsPerMeasure(multiStaff, srcPiano, filename):
    for nth in range(1, len(multiStaff[0].find_all('Measure'))+1):

        #To gather track-strings in measures with the same number of this group of piano staffs
        measureColumn = []
        
        #In this loop, handle measures with the same number in all the staffs each time
        for staff in multiStaff:
            measure = staff.select_one('Measure:nth-of-type('+str(nth)+')')

            #Determine the 1st time signature or modify it once it changes
            if measure.find('TimeSig'):
                measureDivision = ifTimeSig(measure)

            #Continue to next measure if there's 'len' attribute in "additional" measure #1
            if measure.get('len'):
                continue

            #Initialization
            track_0_time_acc = 0
            isTrack = False
            track_key = 'track0'
            track_N_time_acc = 0
            ticking = None
            thisMeasure = {track_key:''}
            for track in measure.find_all('track'):
                if thisMeasure.get('track'+track.text) is None:
                    thisMeasure.update({'track'+track.text:''})
            trackSign = 'single_track' if len(thisMeasure) == 1 else 'multi_track'

            #Set recursive=False to avoid <Tuplet> inside <Chord> or <Rest>
            for tag in measure.find_all(['Rest','Chord','Tuplet','tick'], recursive=False):
                if tag.name == 'Chord' or tag.name == 'Rest':
                    
                    #Do something if the track changes to another number
                    #Continue to next measure if track_N_time_acc is not equivalent to measure duration
                    isTrack, ifBreak, track_key, track_N_time_acc = ifTrackChanges(tag, isTrack, track_key, track_N_time_acc)
                    if ifBreak:
                        break
                        
                    #For the duration of a note, modify the multiplier if it's with dots or within a tuplet
                    multi = ifDots(tag)
                    if tag.find('Tuplet') and tag.find('Tuplet').text == tupID and not tag.find('dots'):
                        multi *= tupRatio
                        
                    #Continue to next measure if this measure is with a dot found in a tuplet
                    elif tag.find('Tuplet') and tag.find('Tuplet').text == tupID and tag.find('dots'):
                        break

                    #Build the string for every track
                    duTime, thisMeasure[track_key] = buildTrackString(tag, multi, thisMeasure, track_key, True)

                    #Accumulate note duration for checking special cases
                    #Continue to next measure if a nonzero track doesn't start at head of this measure
                    track_0_time_acc, track_N_time_acc, ticking, ifBreak = accuDura(isTrack, track_0_time_acc, track_N_time_acc, duTime, ticking, measureDivision)
                    if ifBreak:
                        break
                        
                elif tag.name == 'Tuplet':
                    #Continue to next measure if this measure is with a tuplet not of 3, 5, 7 or 9'
                    not3579, tupID, tupRatio = ifTuplet(tag)
                    if not3579:
                        break
                else: #tag.name == 'tick'
                    ticking = int(tag.text)
            else:
                #Continue to next measure if the duration of the last nonzero track is shorter than the measure duration
                if trackSign == 'multi_track' and track_N_time_acc != duraDic['measure']:
                    continue

                #Log if the accumulation of note duration doesn't match Time Signature
                if duraError(track_0_time_acc, filename, measure):
                    continue

                #Trim the strings in the measure dictionary.
                #Append measures with the same number to measureColumn
                thisMeasure = {track:thisMeasure[track][:-1] for track in thisMeasure}
                measureColumn.append(thisMeasure)
        else:
            #Determine the pitch of the root
            root, flatColumn = getRoot(measureColumn)
            
            #Determine the tonality if there's a root else this is a rest measure
            if root:
                tonality, relativePitch = getTonality(root, flatColumn)
                if tonality is None:
                    continue
                thisMeasure = {'track'+str(i):s for i,s in enumerate(relativePitch)}
            else:
                thisMeasure = {'track0':'1,0'}
            
            thisMeasure.update({'duration':duraDic['measure'], 'filename':filename[:-5]})
            print {'multi_staff':thisMeasure}

In [None]:
main('C:/Users/BigData/Desktop/mscx/')