In [1]:
import muspy
import numpy as np
import math
import matplotlib.pyplot as plt
import os

In [2]:
test = muspy.read_musicxml("../data/xml_files/itomori.xml")

In [3]:
def get_resolution_threshold(mus, threshold=100):
    """Return the nearest even MusPy resolution needed to consider notes 'threshold' milliseconds apart as different (but no lower)."""
    return [round(((x.qpm / 60) ** -1 * 1000 / threshold) / 2) * 2 for x in mus.tempos]

def parse_input(true_data, path, threshold=100, resolution=None, align_delta=0):
    cntTab = {'2/2':2, '3/2':3, '4/2':4, '2/4':4, '3/4':3, '4/4':4, '5/4':5, '6/8':6, '7/8':3, '9/8':9, '11/8':4, '12/8':4, '3/8': 4, '4/8': 8, '8/8': 8}
    """Parse a .txt file output from the follow.js API into a MusPy object."""
    'TODO: Add support for multiple tempos'
    with open(path) as f:
        raw_data = f.readlines()[1:]
    
    raw_data = list(filter(lambda x: 'CountInStart' not in x, raw_data))

    #raw_data = np.array([[float(y) for y in x.strip("\n|,").split(",")] for x in raw_data]) # raw_data is now a list of (time, pitch, velocity) tuples
    #parsed_data = np.zeros((raw_data.shape[0] // 2, 4))
    raw_data = [[float(y) for y in x.strip("\n|,").split(",")] for x in raw_data if x[-2] != ',']
    parsed_data = []
    num = true_data.time_signatures[0].numerator
    den = true_data.time_signatures[0].denominator
    countoff_offset = cntTab[f'{num}/{den}'] / (den / 4) * (true_data.tempos[0].qpm / 60) ** -1 # time offset in milliseconds to account for countoff
    raw_res = get_resolution_threshold(true_data, threshold)[0]
    if resolution is not None:
        raw_res = resolution
    time2beats = lambda x: math.floor(x * true_data.tempos[0].qpm * raw_res / 60)

    # def time2beats(x):
    #     roundbeats = round(x * true_data.tempos[0].qpm * raw_res / 60)
    #     timeInBeat = roundbeats % resolution
    #     if resolution != 12:
    #         raise NotImplementedError
    #     upb = roundbeats
    #     downb = roundbeats
    #     if timeInBeat not in [0,3,4,6,8,9]:
    #         while True:
    #             upb += 1
    #             downb -= 1
    #             if downb % resolution in [0,3,4,6,8,9]:
    #                 return downb
    #             if upb % resolution in [0,3,4,6,8,9]:
    #                 return upb
    #     else:
    #         return roundbeats
            
    min_len = true_data.tempos[0].qpm / (60 * raw_res) * 3
    # for i in np.arange(0, raw_data.shape[0], 2):
    #     parsed_data[i // 2] = np.array(
    #         [
    #             time2beats(raw_data[i][0] / 1000 - countoff_offset), # time in new resolution
    #             raw_data[i][1], # MIDI pitch
    #             time2beats(min((raw_data[i+1][0] - raw_data[i][0]) / 1000), ), # duration in new resolution
    #             raw_data[i][2] # velocity
    #         ],
    #     dtype=int)
    n = len(raw_data)
    for i in np.arange(0, n):
        if raw_data[i][-1] in [0,1,2,3,4,5,6,7,8,9,10,127]:
            continue
        cur_data = raw_data[i]
        for ed in np.arange(i+1, n):
            if raw_data[ed][1] == cur_data[1]:
                end_data = raw_data[ed]
                raw_data[ed][-1] = 0
                break
        parsed_data.append(
            [
                time2beats(max((cur_data[0] + align_delta) / 1000 - countoff_offset, 0)), # time in new resolution
                raw_data[i][1], # MIDI pitch
                time2beats(max((end_data[0] - cur_data[0]) / 1000,  min_len)), # duration in new resolution
                raw_data[i][2] # velocity
            ]
        )
    parsed_data = np.array(parsed_data)
    input_mus = muspy.from_note_representation(parsed_data.astype(int), resolution=raw_res)
    return input_mus

In [4]:
from operator import attrgetter

def to_stackedpitch_representation(music, use_hold_state=False):
    # Collect notes
    notes = []
    for track in music.tracks:
        notes.extend(track.notes)

    # Raise an error if no notes are found
    if not notes:
        raise RuntimeError("No notes found.")

    # Sort the notes
    notes.sort(key=attrgetter("time", "pitch", "duration", "velocity"))
    # Initialize the array
    length = max((note.end for note in notes))
    #print(length)
    #array = np.zeros((length, 1), dtype=int)

    # # Fill the array with rests
    array = []
    ticker = 0
    arr_idx = 0
    while ticker < length:
        addarr = 0
        cmpnd = set()
        for i in range(arr_idx, len(notes)):
            if notes[i].time == ticker:
                cmpnd.add(notes[i])
                #array.append(notes[i].pitch)
                addarr += 1
            else:
                break
        if len(cmpnd) > 0:
            array.append(cmpnd)
        else:
            array.append('_')
        arr_idx += addarr
        ticker += 1
    return array

def clean_output(true_path, out_path, threshold=100, resolution=None):
    """Clean a .txt file output from the follow.js API into a MusPy object."""
    if true_path.endswith(".json"):
        true_mus = muspy.load(true_path)
    elif true_path.endswith(".xml"):
        true_mus = muspy.read_musicxml(true_path)
    elif true_path.endswith(".abc"):
        true_mus = muspy.read_abc(true_path)
    else:
        raise NotImplementedError("File type not supported.")
    
    if type(true_mus) == list:
        true_mus = true_mus[0]
    best_mus = None
    best_score = None
    input_mus = parse_input(true_mus, out_path, threshold, resolution=12)
    sptrue = to_stackedpitch_representation(true_mus.adjust_resolution(12))
    spin = to_stackedpitch_representation(input_mus.adjust_resolution(12))
    spout = nw_note(spin, sptrue, resolution=12)
    input_mus = from_stackedpitch_representation(input_mus, spout)
    finest_rhythm = round(sorted(np.unique((true_mus.to_note_representation()[:, 0] / true_mus.resolution) % 1))[1] ** -1) # the finest rhythm in the piece
    true_mus.adjust_resolution(target=resolution)
    input_mus.adjust_resolution(target=resolution)
    input_mus.barlines = true_mus.barlines
    input_mus.time_signatures = true_mus.time_signatures
    bartimes = np.array([x.time for x in input_mus.barlines])
    bartime2n = {x: i for i,x in enumerate(bartimes)}
    # score = 0
    for i, note in enumerate(true_mus.tracks[0].notes):
        note.bar_n = bartime2n[bartimes[bartimes <= note.time].max()] # Label with 0-indexed bar number
    ticker = 0
    true_note_idx = 0
    inp_note_idx = 0
    while ticker < true_mus.tracks[0].notes[-1].end:
        true_tick = []
        true_notes = []
        input_tick = []
        # get notes at current tick
        for idx in range(true_note_idx, len(true_mus.tracks[0].notes)):
            if true_mus.tracks[0].notes[idx].time == ticker:
                true_tick.append(idx)
                true_notes.append(true_mus.tracks[0].notes[idx].pitch)
            else:
                break
        true_note_idx += len(true_tick)

        for idx in range(inp_note_idx, len(input_mus.tracks[0].notes)):
            if input_mus.tracks[0].notes[idx].time == ticker:
                input_tick.append(idx)
            else:
                break
        inp_note_idx += len(input_tick)

        for idx in input_tick:
            if input_mus.tracks[0].notes[idx].pitch in true_notes:
                input_mus.tracks[0].notes[idx].label = 1
            else:
                input_mus.tracks[0].notes[idx].label = 0
            input_mus.tracks[0].notes[idx].bar_n = bartime2n[bartimes[bartimes <= input_mus.tracks[0].notes[idx].time].max()]
        
        ticker += 1
    

    # score /= len(input_mus.tracks[0].notes)
    # print(ad, score)
    return input_mus, true_mus

def seq2string(s):
    out = []
    for i in range(len(s)):
        if type(s[i]) == set:
            pitchlist = sorted([x.pitch for x in s[i]])
            out.append('-'.join([str(x) for x in pitchlist]))
        elif type(s[i]) == str:
            out.append(s[i])
        else:
            out.append(s[i].pitch)
    return " ".join(out)

def seq2seqstring(s):
    out = []
    for i in range(len(s)):
        if type(s[i]) == set:
            pitchlist = sorted([x.pitch for x in s[i]])
            out.append('-'.join([str(x) for x in pitchlist]))
        elif type(s[i]) == str:
            out.append(s[i])
        else:
            out.append(str(s[i].pitch))
    return out

def nw_note(inx, iny, match = 1, mismatch = 2, gap = 20, resolution=4):
    # match /= resolution
    # mismatch /= resolution
    gap /= resolution
    x = list(seq2seqstring(inx))
    y = list(seq2seqstring(iny))
    nx = len(x)
    ny = len(y)
    # Optimal score at each possible pair of characters.
    F = np.zeros((nx + 1, ny + 1))
    F[:,0] = np.linspace(0, -nx * gap, nx + 1)
    F[0,:] = np.linspace(0, -ny * gap, ny + 1)
    # Pointers to trace through an optimal aligment.
    P = np.zeros((nx + 1, ny + 1))
    P[:,0] = 3
    P[0,:] = 4
    # Temporary scores.
    t = np.zeros(3)
    for i in range(nx):
        for j in range(ny):
            if x[i] == y[j]:
                t[0] = F[i,j] + match
            else:
                t[0] = F[i,j] - mismatch / (1 + len(set(y[j].split("-")).intersection(set(x[i].split("-")))))
            t[1] = F[i,j+1] - gap
            t[2] = F[i+1, j] - gap
            tmax = np.max(t)
            F[i+1,j+1] = tmax
            if t[0] == tmax:
                P[i+1,j+1] += 2
            if t[1] == tmax:
                P[i+1,j+1] += 3
            if t[2] == tmax:
                P[i+1,j+1] += 4
    # print(F)
    # Trace through an optimal alignment.
    i = nx
    j = ny
    rx = []
    ry = []
    while i > 0 or j > 0:
        if P[i,j] in [2, 5, 6, 9]:
            rx.append(inx[i-1])
            ry.append(iny[j-1])
            i -= 1
            j -= 1
        elif P[i,j] in [4, 6, 7, 9]:
            rx.append('_')
            ry.append(iny[j-1])
            j -= 1
        elif P[i,j] in [3, 5, 7, 9]:
            rx.append(inx[i-1])
            ry.append('DEL')
            i -= 1


    for idx in range(len(ry)):
        if ry[idx] == 'DEL':
            # if rx[idx] == '_':
            rx[idx] = 'DEL'
            # else:
            #     idx2 = idx
            #     while rx[idx2] != "_":
            #         idx2 -= 1
            #     rx[idx2] = 'DEL'
    rx = list(filter(lambda w: w != 'DEL', rx))
    ry = list(filter(lambda w: w != 'DEL', ry))
    rx = rx[::-1]
    ry = ry[::-1]
    assert (np.array(list(seq2seqstring(ry))) != np.array(seq2seqstring(iny))).mean() == 0.0

    return rx

def from_stackedpitch_representation(inmusic, rx):
    note_seq = []
    for i, x in enumerate(rx):
        if x == '_':
            continue
        for note in x:
            note.time = i
            note.duration = max(note.duration, 1)
            note_seq.append(note)
    inmusic.tracks[0].notes = note_seq
    return inmusic

In [5]:
true = muspy.read_musicxml("../data/xml_files/stones.xml")

In [10]:
x, xtrue = clean_output("../data/xml_files/stones.xml", "../data/outputs/julian_output/stone_output (2).txt", resolution=8)

In [6]:
x = parse_input(muspy.read_musicxml("../data/xml_files/stones.xml"), "../data/outputs/julian_output/stone_output (2).txt", resolution=12)

In [119]:
nextt = None
for i,  (bar, nextb) in enumerate(zip(x.barlines, x.barlines[1:] + [muspy.Barline(x.barlines[-1].time + 1000)])):
    bartrue = set([(z.time, z.pitch) for z in filter(lambda y: bar.time <= y.time < nextb.time, xtrue.tracks[0].notes)])
    barinp = set([(z.time, z.pitch) for z in filter(lambda y: bar.time <= y.time < nextb.time, x.tracks[0].notes)])
    #barinp = list([int(not z.label) for z in filter(lambda y: prevt <= y.time < bar.time, x.tracks[0].notes)])
    print(f"Bar {i}: {np.abs(len(barinp ^ bartrue)) / ((nextb.time - bar.time))}")
    prevt = bar.time
    x.barlines[i].score = np.abs(len(barinp ^ bartrue)) / (len(bartrue))


Bar 0: 0.0
Bar 1: 0.16666666666666666
Bar 2: 0.0
Bar 3: 0.16666666666666666
Bar 4: 0.041666666666666664
Bar 5: 0.0
Bar 6: 0.08333333333333333
Bar 7: 0.041666666666666664
Bar 8: 0.041666666666666664
Bar 9: 0.125
Bar 10: 0.25
Bar 11: 0.08333333333333333
Bar 12: 0.125
Bar 13: 0.16666666666666666
Bar 14: 0.25
Bar 15: 0.041666666666666664
Bar 16: 0.08333333333333333
Bar 17: 0.041666666666666664
Bar 18: 0.08333333333333333
Bar 19: 0.08333333333333333
Bar 20: 0.125
Bar 21: 0.125
Bar 22: 0.0
Bar 23: 0.0
Bar 24: 0.041666666666666664
Bar 25: 0.3333333333333333
Bar 26: 0.08333333333333333
Bar 27: 0.08333333333333333
Bar 28: 0.20833333333333334
Bar 29: 0.20833333333333334
Bar 30: 0.008


In [7]:
x.write_midi("stone2_julian.mid")