# BPSD: Score-Audio Synchronization

- Align notes from score (in csv format) with audio recordings, using measure positions as anchors.
- requirements: Sync Toolbox code (https://github.com/meinardmueller/synctoolbox)

[1] Sebastian Ewert, Meinard Müller, and Peter Grosche. "High resolution audio synchronization using chroma onset features." 2009 IEEE International Conference on Acoustics, Speech and Signal Processing. IEEE, 2009.  

[2] Thomas Prätzlich, Jonathan Driedger, and Meinard Müller. "Memory-restricted multiscale dynamic time warping." 2016 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP). IEEE, 2016.

[3] Meinard Müller et al. "Sync Toolbox: A Python package for efficient, robust, and accurate music synchronization." Journal of Open Source Software 6.64 (2021): 3434.

Johannes Zeitler (johannes.zeitler@audiolabs-erlangen.de), 2024

In [1]:
import sys
import os


sys.path.append("path_to_synctoolbox")

In [2]:
# Loading some modules and defining some constants used later
import IPython.display as ipd
from libfmp.b import list_to_pitch_activations, plot_chromagram, plot_signal, plot_matrix, \
                     sonify_pitch_activations_with_signal, read_csv
import libfmp.c2
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.interpolate
from scipy.io import wavfile

from synctoolbox.dtw.mrmsdtw import sync_via_mrmsdtw, __check_anchor_pairs, __split_features, __diagonal_warping_path, sync_via_mrmsdtw_with_anchors
from synctoolbox.dtw.utils import compute_optimal_chroma_shift, shift_chroma_vectors, make_path_strictly_monotonic
from synctoolbox.feature.csv_tools import read_csv_to_df, df_to_pitch_features, df_to_pitch_onset_features
from synctoolbox.feature.chroma import pitch_to_chroma, quantize_chroma, quantized_chroma_to_CENS
from synctoolbox.feature.dlnco import pitch_onset_features_to_DLNCO, __visualize_LN_features
from synctoolbox.feature.pitch import audio_to_pitch_features
from synctoolbox.feature.pitch_onset import audio_to_pitch_onset_features
from synctoolbox.feature.utils import estimate_tuning
%matplotlib inline

from typing import List, Tuple, Optional
from midi_utils import save_midi

from tqdm.notebook import tqdm
from itertools import groupby
from copy import deepcopy

from pydub import AudioSegment

In [3]:
Fs = 22050
feature_rate = 50
step_weights = np.array([1.5, 1.5, 2.0])
threshold_rec = 10 ** 6

### helper functions

In [4]:
def in_arr(arr, val, tol=1e-4):
    
    minIdx = np.argmin(np.abs(arr - val))
    
    if np.abs(arr[minIdx]-val) <= tol:
        return True, minIdx
    else:
        return False, None
    
    #return np.min(np.abs(df-val)) <= tol

def isint(val, tol=1e-10):
    return np.abs(np.mod(val, 1)) <= tol

In [5]:
def writeMP3(f, sr, x, normalized=False):
    """numpy array to MP3"""
    channels = 2 if (x.ndim == 2 and x.shape[1] == 2) else 1
    if normalized:  # normalized array - each item should be a float in [-1, 1)
        y = np.int16(x * 2 ** 15)
    else:
        y = np.int16(x)
    song = AudioSegment(y.tobytes(), frame_rate=sr, sample_width=2, channels=channels)
    song.export(f, format="mp3", bitrate="192k")

In [6]:
def read_csv_to_df(csv_filepath: str = '',
                   csv_delimiter: str = ';') -> pd.DataFrame:
    """Reads .csv file containing symbolic music into a pandas DataFrame.
    Column names are normalized to be lower case.

    Parameters
    ----------
    csv_filepath : str
        Filepath to the .csv file.

    csv_delimiter : str
        Delimiter of the .csv file (default: ';')

    Returns
    -------
    df : pd.Dataframe
        Annotations in pandas Dataframe format.
    """

    df = pd.read_csv(filepath_or_buffer=csv_filepath,
                     delimiter=csv_delimiter)#, dtype="str")#, index_col=0)
    df.columns = df.columns.str.lower()

    if 'pitch' in df.columns:
        df['pitch'] = df['pitch'].astype(int)

    return df

In [7]:
def dynamics_to_velocity(df, velMap={"ppp":20, "pp":39, "p":61, "mp":71, "mf":84, "f":98, "fp":98, "ff": 113, "fff":127, "sf":113}):
    velocities = []
    for i, row in df.iterrows():
        velocities.append(velMap[row["dynamics"]])
    
    df["velocity"] = velocities
    
    return df

In [8]:
def sync_via_mrmsdtw_with_anchors(f_chroma1: np.ndarray,
                                  f_chroma2: np.ndarray,
                                  f_onset1: np.ndarray = None,
                                  f_onset2: np.ndarray = None,
                                  input_feature_rate: int = 50,
                                  step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32),
                                  step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
                                  threshold_rec: int = 10000,
                                  win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]),
                                  downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]),
                                  verbose: bool = False,
                                  dtw_implementation: str = 'synctoolbox',
                                  normalize_chroma: bool = True,
                                  chroma_norm_ord: int = 2,
                                  chroma_norm_threshold: float = 0.001,
                                  visualization_title: str = "MrMsDTW result",
                                  anchor_pairs: List[Tuple] = None,
                                  linear_inp_idx: List[int] = [],
                                  alpha=0.5) -> np.ndarray:
    """Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features.
        MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint
        regions defined by the alignment found on the previous, coarser level.
        If onset features are provided, these are used on the finest level in addition to chroma
        to provide higher synchronization accuracy.

        Parameters
        ----------
        f_chroma1 : np.ndarray [shape=(12, N)]
            Chroma feature matrix of the first sequence

        f_chroma2 : np.ndarray [shape=(12, M)]
            Chroma feature matrix of the second sequence

        f_onset1 : np.ndarray [shape=(L, N)]
            Onset feature matrix of the first sequence (optional, default: None)

        f_onset2 : np.ndarray [shape=(L, M)]
            Onset feature matrix of the second sequence (optional, default: None)

        input_feature_rate: int
            Input feature rate of the chroma features (default: 50)

        step_sizes: np.ndarray
            DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]]))

        step_weights: np.ndarray
            DTW step weights (np.array([1.0, 1.0, 1.0]))

        threshold_rec: int
            Defines the maximum area that is spanned by the rectangle of two
            consecutive elements in the alignment (default: 10000)

        win_len_smooth : np.ndarray
            Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1]))

        downsamp_smooth : np.ndarray
            Downsampling factors (default: np.array([50, 25, 5, 1]))

        verbose : bool
            Set `True` for visualization (default: False)

        dtw_implementation : str
            DTW implementation, librosa or synctoolbox (default: synctoolbox)

        normalize_chroma : bool
            Set `True` to normalize input chroma features after each downsampling
            and smoothing operation.

        chroma_norm_ord: int
            Order of chroma normalization, relevant if ``normalize_chroma`` is True.
            (default: 2)

        chroma_norm_threshold: float
            If the norm falls below threshold for a feature vector, then the
            normalized feature vector is set to be the unit vector. Relevant, if
            ``normalize_chroma`` is True (default: 0.001)

        visualization_title : str
            Title for the visualization plots. Only relevant if 'verbose' is True
            (default: "MrMsDTW result")

        anchor_pairs: List[Tuple]
            Anchor pairs given in seconds. Note that
            * (0, 0) and (<audio-len1>, <audio-len2>) are not allowed.
            * Anchors must be monotonously increasing.

        linear_inp_idx: List[int]
            List of the indices of intervals created by anchor pairs, for which
            MrMsDTW shouldn't be run, e.g., if the interval only involves silence.

            0        ap1        ap2        ap3
            |         |          |          |
            |  idx0   |   idx1   |  idx2    |  idx3 OR idx-1
            |         |          |          |

            Note that index -1 corresponds to the last interval, which begins with
            the last anchor pair until the end of the audio files.

        alpha: float
            Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm.
            C = alpha * C_Chroma + (1 - alpha) * C_act  (default: 0.5)

        Returns
        -------
        wp : np.ndarray [shape=(2, T)]
            Resulting warping path which indicates synchronized indices.
    """
        
    wp_cur_list = []    
    
    if anchor_pairs is None:
        wp = sync_via_mrmsdtw(f_chroma1=f_chroma1,
                              f_chroma2=f_chroma2,
                              f_onset1=f_onset1,
                              f_onset2=f_onset2,
                              input_feature_rate=input_feature_rate,
                              step_sizes=step_sizes,
                              step_weights=step_weights,
                              threshold_rec=threshold_rec,
                              win_len_smooth=win_len_smooth,
                              downsamp_smooth=downsamp_smooth,
                              verbose=verbose,
                              dtw_implementation=dtw_implementation,
                              normalize_chroma=normalize_chroma,
                              chroma_norm_ord=chroma_norm_ord,
                              chroma_norm_threshold=chroma_norm_threshold,
                              visualization_title=visualization_title,
                              alpha=alpha)
    else:
        wp = None

        if verbose:
            print('Anchor points are given!')


        # Add ending as the anchor point
        if (anchor_pairs[-1][0] < f_chroma1.shape[1]/input_feature_rate - 1/input_feature_rate) or (anchor_pairs[-1][1] < f_chroma2.shape[1]/input_feature_rate - 1/input_feature_rate):
            anchor_pairs.append((f_chroma1.shape[1]/input_feature_rate, f_chroma2.shape[1]/input_feature_rate))
            
        prev_a1 = 0
        prev_a2 = 0
        
        flag_quit = False

        for idx, anchor_pair in enumerate(anchor_pairs):
            cur_a1, cur_a2 = anchor_pair
            
            if cur_a1 == 0:
                wp_cur = np.concatenate([-np.ones(int(cur_a2*input_feature_rate))[None,:],
                                         np.arange(int(cur_a2*input_feature_rate))[None,:]],
                                        axis=0)
                wp_cur_list.append(wp_cur)
            elif cur_a2 == 0:
                wp_cur = np.concatenate([np.arange(int(cur_a1*input_feature_rate))[None,:],
                                        -np.ones(int(cur_a1*input_feature_rate))[None,:]],
                                        axis=0)
                wp_cur_list.append(wp_cur)
                
            elif  (prev_a1 - f_chroma1.shape[1]/input_feature_rate) >= -1/input_feature_rate:
                indices_2 = np.arange( f_chroma2.shape[1] - int(prev_a2*input_feature_rate))
                
                
                wp_cur = np.concatenate([ np.ones_like(indices_2),
                                          #int(prev_a1*input_feature_rate) + 1 + np.zeros_like(indices_2),
                                          indices_2], axis=0)
                wp_cur_list.append(wp_cur)
                
                
                flag_quit=True
                
                
            elif  (prev_a2 - f_chroma2.shape[1]/input_feature_rate) >= -1/input_feature_rate:
                indices_1 = np.arange( f_chroma1.shape[1] - int(prev_a1*input_feature_rate))
                
                wp_cur = np.concatenate([indices_1,                     
                                        np.ones_like(indices_1),
                                          ], axis=0)
                wp_cur_list.append(wp_cur)
                
                
                flag_quit=True
                
            
                
            else:

                # Split the features
                f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split = __split_features(f_chroma1,
                                                                                                    f_onset1,
                                                                                                    f_chroma2,
                                                                                                    f_onset2,
                                                                                                    cur_a1,
                                                                                                    cur_a2,
                                                                                                    prev_a1,
                                                                                                    prev_a2,
                                                                                                    input_feature_rate)

                if idx in linear_inp_idx or idx == len(anchor_pairs) - 1 and -1 in linear_inp_idx:
                    # Generate a diagonal warping path, if the algorithm is not supposed to executed.
                    # A typical scenario is the silence breaks which are enclosed by two anchor points.
                    if verbose:
                        print('A diagonal warping path is generated for the interval \n\t Feature sequence 1: %.2f - %.2f'
                              '\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2))
                    wp_cur = __diagonal_warping_path(f_chroma1_split, f_chroma2_split)
                    wp_cur_list.append(wp_cur)

                else:
                    if verbose:
                        if cur_a1 != -1 and cur_a2 != -1:
                            print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - %.2f'
                                  '\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2))
                        else:
                            print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - end'
                                  '\n\t Feature sequence 2: %.2f - end\n' % (prev_a1, prev_a2))
                    wp_cur = sync_via_mrmsdtw(f_chroma1=f_chroma1_split,
                                              f_chroma2=f_chroma2_split,
                                              f_onset1=f_onset1_split,
                                              f_onset2=f_onset2_split,
                                              input_feature_rate=input_feature_rate,
                                              step_sizes=step_sizes,
                                              step_weights=step_weights,
                                              threshold_rec=threshold_rec,
                                              win_len_smooth=win_len_smooth,
                                              downsamp_smooth=downsamp_smooth,
                                              verbose=verbose,
                                              dtw_implementation=dtw_implementation,
                                              normalize_chroma=normalize_chroma,
                                              chroma_norm_ord=chroma_norm_ord,
                                              chroma_norm_threshold=chroma_norm_threshold,
                                              alpha=alpha)
                    wp_cur_list.append(wp_cur)

            if wp is None:
                wp = np.array(wp_cur, copy=True)

            # Concatenate warping paths
            else:
                wp = np.concatenate([wp, wp_cur + wp[:, -1].reshape(2, 1) + 1], axis=1)
                
            prev_a1 = cur_a1
            prev_a2 = cur_a2
            
            if flag_quit: 
                break

        anchor_pairs.pop()

    return wp

In [9]:
def pitch_onset_matrix_to_DLNCO(pitch_onset_matrix : np.ndarray,
                                  feature_sequence_length: int,
                                  feature_rate: int = 50,
                                  midi_min: int = 21,
                                  midi_max: int = 108,
                                  log_compression_gamma: float = 10000.0,
                                  chroma_norm_ord: int = 2,
                                  LN_maxfilterlength_seconds: float = 0.8,
                                  LN_maxfilterthresh: float = 0.1,
                                  DLNCO_filtercoef: np.ndarray = np.sqrt(1 / np.arange(1, 11)),
                                  visualize=False) -> np.ndarray:
    """Computes decaying locally adaptive normalized chroma onset (DLNCO) features from
    a dictionary of peaks obtained e.g. by ``audio_to_pitch_onset_features``.

    Parameters
    ----------
    f_peaks : dict
        A dictionary of onset peaks

            * Each key corresponds to the midi pitch number

            * Each value f_peaks[midi_pitch] is an array of doubles of size 2xN:

                + First row give the positions of the peaks in milliseconds.

                + Second row contains the corresponding magnitudes of the peaks.

    feature_sequence_length : int
        Desired length of the resulting feature sequence. This should be at least as long as the
        position of the last peak in ``f_peaks``, but can be longer.

    feature_rate : int
        Desired features per second in the output representation

    midi_min : int
        Minimum MIDI pitch index (default: 21)

    midi_max : int
        Maximum MIDI pitch index (default: 108)

    log_compression_gamma : float
        Gamma factor of the log compression applied to peak magnitudes.
        
    chroma_norm_ord : int
        Order of the norm used for chroma onset vectors.

    LN_maxfilterlength_seconds : float
        Length of the maximum filter applied for determining local norm of chroma onsets in seconds.

    LN_maxfilterthresh : float
        Minimum threshold for normalizing chroma onsets using local norm.

    DLNCO_filtercoef : np.ndarray
        Sequence of decay coefficients applied on normalized chroma onsets.

    visualize : bool
        Set `True` to visualize chroma onset features (Default: False)

    Returns
    -------
    f_DLNCO : np.array [shape=(d_dlnco, N_dlnco)]
        Decaying Locally adaptively Normalized Chroma Onset features
    """
    
    f_CO = pitch_to_chroma(pitch_onset_matrix).T

    # No two ways to normalize F_CO: simply columnwise (f_N) or via local
    # normalizing curve (f_LN)
    f_N = np.zeros(feature_sequence_length)

    for k in range(feature_sequence_length):
        f_N[k] = np.linalg.norm(f_CO[k, :], chroma_norm_ord)

    f_LN = np.array(f_N, copy=True)
    f_left = np.array(f_N, copy=True)
    f_right = np.array(f_N, copy=True)
    LN_maxfilterlength_frames = int(LN_maxfilterlength_seconds * feature_rate)
    if LN_maxfilterlength_frames % 2 == 1:
        LN_maxfilterlength_frames -= 1
    shift = int(np.floor((LN_maxfilterlength_frames) / 2))

    # TODO improve with scipy.ndimage.maximum_filter
    for s in range(shift):
        f_left = np.roll(f_left, 1, axis=0)
        f_left[0] = 0
        f_right = np.roll(f_right, -1, axis=0)
        f_right[-1] = 0
        f_LN = np.max([f_left, f_LN, f_right], axis=0)

    f_LN = np.maximum(f_LN, LN_maxfilterthresh)

    # Compute f_NC0 (normalizing f_C0 using f_N)
    # f_NCO = np.zeros((feature_sequence_length, 12))

    # Compute f_LNC0 (normalizing f_C0 using f_LN)
    f_LNCO = np.zeros((feature_sequence_length, 12))
    for k in range(feature_sequence_length):
        # f_NCO[k, :] = f_CO[k, :] / (f_N[k]) #+ eps)
        f_LNCO[k, :] = f_CO[k, :] / f_LN[k]

    # Compute f_DLNCO
    f_DLNCO = np.zeros((feature_sequence_length, 12))

    num_coef = DLNCO_filtercoef.size
    for p_idx in range(12):
        v_shift = np.array(f_LNCO[:, p_idx], copy=True)
        v_help = np.zeros((feature_sequence_length, num_coef))

        for n in range(num_coef):
            v_help[:, n] = DLNCO_filtercoef[n] * v_shift
            v_shift = np.roll(v_shift, 1)
            v_shift[0] = 0

        f_DLNCO[:, p_idx] = np.max(v_help, axis=1)

    # visualization
    if visualize:
        plot_chromagram(X=f_CO.T, title='CO', colorbar=True, Fs=feature_rate, colorbar_aspect=50, figsize=(9, 3))
        __visualize_LN_features(f_N, f_LN, feature_sequence_length, feature_rate)
        plot_chromagram(X=f_LNCO.T, title='LNCO', colorbar=True, Fs=feature_rate, colorbar_aspect=50, figsize=(9, 3))
        plot_chromagram(X=f_DLNCO.T, title='DLNCO', colorbar=True, Fs=feature_rate, colorbar_aspect=50, figsize=(9, 3))

    f_DLNCO = f_DLNCO.T

    return f_DLNCO

In [10]:
def get_features_from_annotation(df_annotation, feature_rate, visualize=True):
    if "velocity" not in df_annotation.keys():
        df_annotation["velocity"] = 64
    f_pitch = df_to_pitch_features(df_annotation, feature_rate=feature_rate)
    f_chroma = pitch_to_chroma(f_pitch=f_pitch)
    f_chroma_quantized = quantize_chroma(f_chroma=f_chroma)
    if visualize:
        plot_chromagram(f_chroma_quantized, title='Quantized chroma features - Annotation', Fs=feature_rate, figsize=(9, 3))
    f_pitch_onset = df_to_pitch_onset_features(df_annotation)
    f_DLNCO = pitch_onset_features_to_DLNCO(f_peaks=f_pitch_onset,
                                            feature_rate=feature_rate,
                                            feature_sequence_length=f_chroma_quantized.shape[1],
                                            visualize=visualize)
    
    return f_chroma_quantized, f_DLNCO

In [13]:
audio_dir = os.path.join("../", "1_Audio")
csv_in_dir_measures = os.path.join("../", "2_Annotations", "ann_score_note")
anchor_dir = os.path.join("../", "2_Annotations", "ann_audio_startEnd")



### Fine-tuned transcription features, measure-wise anchor points

In [25]:
note_out_dir = os.path.join("../", "2_Annotations", "ann_audio_note")
onset_frames_dir = "path_to_transcription_results"
measure_in_dir = os.path.join("../", "2_Annotations", "ann_audio_measure")

In [26]:
pieces = list(set(["_".join(f.split("_")[:-1]) for f in os.listdir(audio_dir) if ".wav" in f]))
pieces.sort()

performers = list(set([(f.split(".")[0].split("_")[-1]) for f in os.listdir(audio_dir) if ".wav" in f]))
performers.sort()

In [None]:
for performer in performers:
    
    for piece in tqdm(pieces):
        if not "026" in piece:continue
        print("Processing %s_%s"%(piece, performer))
        audio, _ = librosa.load(os.path.join(audio_dir, "%s_%s.wav"%(piece, performer)), Fs)

        df_annotation = read_csv_to_df(os.path.join(csv_in_dir_measures, piece+".csv"), csv_delimiter=';')
        quarterNoteOffset=[]
        beat_fac = 1
        curr_meas=0.
        meas_offset=0
        
        measuresIn = read_csv_to_df(os.path.join(measure_in_dir, "%s_%s.csv"%(piece, performer)), csv_delimiter=";")
        measuresIn.sort_values(by="measure", inplace=True)
        
        
        
        measuresIn = pd.concat([pd.DataFrame({"time":[0, len(audio)/Fs], "measure":[0, int(max(measuresIn.measure))+1]}), measuresIn])
        
        measuresIn.sort_values(by="measure", inplace=True)
        
        meas_start_time = []        
        beat_fac_list = []
        quarterNoteOffset_in_measure = []
        
        for i, row in df_annotation.iterrows():           

            # if new measure starts
            if np.floor(row.start_meas) > curr_meas:
                curr_meas = np.floor(row.start_meas)
                meas_offset += beat_fac*4
                
            meas_start_time.append(measuresIn.time[measuresIn.measure==curr_meas].item())

            beat_fac = int(row.timesig.split("/")[0])/int(row.timesig.split("/")[1])
            quarterNoteOffset.append( meas_offset+(row.start_meas-curr_meas)*beat_fac*4)
            quarterNoteOffset_in_measure.append((row.start_meas-curr_meas)*beat_fac*4)
            
            beat_fac_list.append(beat_fac)

        df_annotation["quarternoteoffset"] = quarterNoteOffset
        df_annotation["quarternoteoffset"] -= min(df_annotation["quarternoteoffset"])
        
        df_annotation["measstarttime"] = meas_start_time
        df_annotation["quarternoteoffsetinmeasure"] = quarterNoteOffset_in_measure
        #########################################################################
        
        measure_duration = []
        for _, row in df_annotation.iterrows():
            rowMeas = int(row.start_meas)
            measure_duration.append( measuresIn.time[measuresIn.measure==(rowMeas+1)].item() - measuresIn.time[measuresIn.measure==rowMeas].item())
            
        df_annotation["beatfac"] = beat_fac_list
        df_annotation["secondspermeasure"] = measure_duration
        df_annotation["secondsperbeat"] = df_annotation["secondspermeasure"] / 4 / df_annotation["beatfac"]
        
        df_annotation["start"] = df_annotation["measstarttime"] + df_annotation["quarternoteoffsetinmeasure"]*df_annotation["secondsperbeat"]#df_annotation["offset"]*secondsperquarter#df_annotation["secondsperquarter"]  
        df_annotation["duration"] = df_annotation["duration_quarterlength"]*df_annotation["secondsperbeat"] - 0.01 #df_annotation["secondsperquarter"]
                
        df_annotation.duration[df_annotation.articulation == "staccato"] /= 2  

        df_annotation["end"] = df_annotation["start"] + df_annotation["duration"]

        df_annotation["instrument"] = ["piano" for _ in df_annotation.iterrows()]
        

        featuresIn = np.load(os.path.join(onset_frames_dir, "%s_%s.npz"%(piece, performer)))

        onsetsIn = featuresIn["onset_pred"].T
        framesIn = featuresIn["frame_pred"].T

        frame_rate_in = featuresIn["sample_rate"] / featuresIn["hop_length"]


        idx_interp = np.floor(np.arange(0, len(audio)/Fs, 1/feature_rate)*frame_rate_in).astype(int)

        onsetsIn_res = onsetsIn[:,idx_interp]
        framesIn_res = framesIn[:,idx_interp]

        f_frames_transcription = np.zeros((128, framesIn_res.shape[1]))
        f_frames_transcription[21:109,:] = framesIn_res

        f_onsets_transcription = np.zeros((128, onsetsIn_res.shape[1]))
        f_onsets_transcription[21:109,:] = onsetsIn_res


        f_chroma_transcriptions = pitch_to_chroma(f_pitch=f_frames_transcription)
        f_chroma_quantized_transcription = quantize_chroma(f_chroma = f_chroma_transcriptions)


        f_DLNCO_transcription = pitch_onset_matrix_to_DLNCO(pitch_onset_matrix = f_onsets_transcription, feature_rate=feature_rate, 
                                                              feature_sequence_length=f_chroma_quantized_transcription.shape[1], visualize=False)

        f_chroma_quantized_annotation, f_DLNCO_annotation = get_features_from_annotation(df_annotation, feature_rate, visualize=False)
        
        
        if f_chroma_quantized_annotation.shape[1] < f_chroma_quantized_transcription.shape[1]:
            f_chroma_quantized_annotation = np.concatenate([f_chroma_quantized_annotation, np.zeros((12, f_chroma_quantized_transcription.shape[1] - f_chroma_quantized_annotation.shape[1]))], axis=-1)
            f_DLNCO_annotation = np.concatenate([f_DLNCO_annotation, np.zeros((12, f_DLNCO_transcription.shape[1] - f_DLNCO_annotation.shape[1]))], axis=-1)
       
        start_pairs = [[start_m, start_s] for start_m, start_s in zip(df_annotation.start_meas, df_annotation.start)]
        start_pairs.sort()
        start_pairs_unique = np.array([pair for pair,_ in groupby(start_pairs)])
        for _, row in measuresIn.iterrows():
            isInArr, idx = in_arr(start_pairs_unique[:,0], row.measure)
            if not isInArr:
                start_pairs_unique = np.concatenate([start_pairs_unique, np.array([row.measure, row.time])[None,:]], axis=0)
        start_pairs_unique.sort(axis=0)        

        meas_to_time_annot = scipy.interpolate.interp1d(start_pairs_unique[:,0], start_pairs_unique[:,1], 
                                                  kind='linear', bounds_error=False, fill_value='extrapolate')

        anchor_pairs = [(row.time, meas_to_time_annot(row.measure).item()) for _,row in measuresIn.iterrows()][1:-1]
        
        anchor_pairs[-1] = (max(measuresIn.iloc[:-1].time), max(df_annotation.end))
        

        wp_transcription_weakMonotonic = sync_via_mrmsdtw_with_anchors(f_chroma1=f_chroma_quantized_transcription, 
                              f_onset1=f_DLNCO_transcription, 
                              f_chroma2=f_chroma_quantized_annotation, 
                              f_onset2=f_DLNCO_annotation, 
                              input_feature_rate=feature_rate, 
                              step_weights=step_weights, 
                              threshold_rec=threshold_rec, 
                              verbose=False,
                              anchor_pairs=anchor_pairs) 
    
       
        wp_transcription = make_path_strictly_monotonic(deepcopy(wp_transcription_weakMonotonic))
        df_annotation_warped_transcription = df_annotation.copy(deep=True)
        df_annotation_warped_transcription["end"] = df_annotation_warped_transcription["start"] + df_annotation_warped_transcription["duration"]
        df_annotation_warped_transcription[['start', 'end']] = scipy.interpolate.interp1d(wp_transcription[1] / feature_rate, 
                                   wp_transcription[0] / feature_rate, kind='linear', bounds_error=False, fill_value="extrapolate")(df_annotation[['start', 'end']])
        
        df_annotation_warped_transcription.end[df_annotation_warped_transcription.end > measuresIn.iloc[-2].time] = measuresIn.iloc[-2].time
        
        df_annotation_warped_transcription["duration"] = df_annotation_warped_transcription["end"] - df_annotation_warped_transcription["start"]
       

        df_annotation_warped_transcription.sort_values(by="start", inplace=True)
        

        df_annotation_warped_transcription.to_csv(os.path.join(note_out_dir, piece+"_"+performer+".csv"), sep=";", index=False,
                                         columns=["start", "end", 
                                                  "start_meas", "end_meas",
                                                  "pitch", "pitchname",
                                                  "timesig", "articulation"],
                                         float_format="%07.3f")
        


### Fine-Tuned transcription features, anchor points for start/end only

... for evaluation of synchronization accuracy

In [28]:
note_out_dir = os.path.join("../", "4_misc", "ann_audio_note_noAnchor")

In [29]:
pieces = [f.split(".")[0] for f in os.listdir(os.path.join("../", "2_Annotations", "ann_score_chord")) if ".csv" in f]
pieces.sort()

performers = list(set([f.split(".")[0].split("_")[-1] for f in os.listdir(os.path.join("../", "2_Annotations", "ann_audio_measure"))]))
performers.sort()

In [None]:
for performer in performers:
    
    for piece in tqdm(pieces):
        if not "026" in piece: continue
        print("Processing %s_%s"%(piece, performer))
        audio, _ = librosa.load(os.path.join(audio_dir, "%s_%s.wav"%(piece, performer)), Fs)
        
        df_annotation = read_csv_to_df(os.path.join(csv_in_dir_measures, piece+".csv"), csv_delimiter=';')

        quarterNoteOffset=[]
        beat_fac = 1
        curr_meas=0.
        meas_offset=0
        
        measuresIn = read_csv_to_df(os.path.join("../", "2_Annotations", "ann_audio_measure", "%s_%s.csv"%(piece, performer)), csv_delimiter=";")
        measuresIn.sort_values(by="measure", inplace=True)
        
        
        
        measuresIn = pd.concat([pd.DataFrame({"time":[0, len(audio)/Fs], "measure":[0, int(max(measuresIn.measure))+1]}), measuresIn])
        
        measuresIn.sort_values(by="measure", inplace=True)
        
        meas_start_time = []        
        beat_fac_list = []
        quarterNoteOffset_in_measure = []
        
        for i, row in df_annotation.iterrows():           

            # if new measure starts
            if np.floor(row.start_meas) > curr_meas:
                curr_meas = np.floor(row.start_meas)
                meas_offset += beat_fac*4
                
            meas_start_time.append(measuresIn.time[measuresIn.measure==curr_meas].item())

            beat_fac = int(row.timesig.split("/")[0])/int(row.timesig.split("/")[1])
            quarterNoteOffset.append( meas_offset+(row.start_meas-curr_meas)*beat_fac*4)
            quarterNoteOffset_in_measure.append((row.start_meas-curr_meas)*beat_fac*4)
            
            beat_fac_list.append(beat_fac)

        df_annotation["quarternoteoffset"] = quarterNoteOffset
        df_annotation["quarternoteoffset"] -= min(df_annotation["quarternoteoffset"])
        
        df_annotation["measstarttime"] = meas_start_time
        df_annotation["quarternoteoffsetinmeasure"] = quarterNoteOffset_in_measure
        #########################################################################
        
        measure_duration = []
        for _, row in df_annotation.iterrows():
            rowMeas = int(row.start_meas)
            measure_duration.append( measuresIn.time[measuresIn.measure==(rowMeas+1)].item() - measuresIn.time[measuresIn.measure==rowMeas].item())
            
        df_annotation["beatfac"] = beat_fac_list
        df_annotation["secondspermeasure"] = measure_duration
        df_annotation["secondsperbeat"] = df_annotation["secondspermeasure"] / 4 / df_annotation["beatfac"]
        
        df_annotation["start"] = df_annotation["measstarttime"] + df_annotation["quarternoteoffsetinmeasure"]*df_annotation["secondsperbeat"] 
        df_annotation["duration"] = df_annotation["duration_quarterlength"]*df_annotation["secondsperbeat"] - 0.01 
        
       
        df_annotation.duration[df_annotation.articulation == "staccato"] /= 2  

        df_annotation["end"] = df_annotation["start"] + df_annotation["duration"]

        df_annotation["instrument"] = ["piano" for _ in df_annotation.iterrows()]
        
        featuresIn = np.load(os.path.join(onset_frames_dir, "%s_%s.npz"%(piece, performer)))

        onsetsIn = featuresIn["onset_pred"].T
        framesIn = featuresIn["frame_pred"].T

        frame_rate_in = featuresIn["sample_rate"] / featuresIn["hop_length"]


        idx_interp = np.floor(np.arange(0, len(audio)/Fs, 1/feature_rate)*frame_rate_in).astype(int)

        onsetsIn_res = onsetsIn[:,idx_interp]
        framesIn_res = framesIn[:,idx_interp]

        f_frames_transcription = np.zeros((128, framesIn_res.shape[1]))
        f_frames_transcription[21:109,:] = framesIn_res

        f_onsets_transcription = np.zeros((128, onsetsIn_res.shape[1]))
        f_onsets_transcription[21:109,:] = onsetsIn_res


        f_chroma_transcriptions = pitch_to_chroma(f_pitch=f_frames_transcription)
        f_chroma_quantized_transcription = quantize_chroma(f_chroma = f_chroma_transcriptions)


        f_DLNCO_transcription = pitch_onset_matrix_to_DLNCO(pitch_onset_matrix = f_onsets_transcription, feature_rate=feature_rate, 
                                                              feature_sequence_length=f_chroma_quantized_transcription.shape[1], visualize=False)








        f_chroma_quantized_annotation, f_DLNCO_annotation = get_features_from_annotation(df_annotation, feature_rate, visualize=False)
        
        
        if f_chroma_quantized_annotation.shape[1] < f_chroma_quantized_transcription.shape[1]:
            f_chroma_quantized_annotation = np.concatenate([f_chroma_quantized_annotation, np.zeros((12, f_chroma_quantized_transcription.shape[1] - f_chroma_quantized_annotation.shape[1]))], axis=-1)
            f_DLNCO_annotation = np.concatenate([f_DLNCO_annotation, np.zeros((12, f_DLNCO_transcription.shape[1] - f_DLNCO_annotation.shape[1]))], axis=-1)

        
        start_pairs = [[start_m, start_s] for start_m, start_s in zip(df_annotation.start_meas, df_annotation.start)]
        start_pairs.sort()
        start_pairs_unique = np.array([pair for pair,_ in groupby(start_pairs)])
        for _, row in measuresIn.iterrows():
            isInArr, idx = in_arr(start_pairs_unique[:,0], row.measure)
            if not isInArr:
                start_pairs_unique = np.concatenate([start_pairs_unique, np.array([row.measure, row.time])[None,:]], axis=0)
        start_pairs_unique.sort(axis=0)        

        meas_to_time_annot = scipy.interpolate.interp1d(start_pairs_unique[:,0], start_pairs_unique[:,1], 
                                                  kind='linear', bounds_error=False, fill_value='extrapolate')

        anchor_pairs = [(row.time, meas_to_time_annot(row.measure).item()) for _,row in measuresIn.iterrows()][1:-1]
        
        anchor_pairs[-1] = (max(measuresIn.iloc[:-1].time), max(df_annotation.end))
        


        wp_transcription_weakMonotonic = sync_via_mrmsdtw_with_anchors(f_chroma1=f_chroma_quantized_transcription, 
                              f_onset1=f_DLNCO_transcription, 
                              f_chroma2=f_chroma_quantized_annotation, 
                              f_onset2=f_DLNCO_annotation, 
                              input_feature_rate=feature_rate, 
                              step_weights=step_weights, 
                              threshold_rec=threshold_rec, 
                              verbose=False,
                              anchor_pairs=[anchor_pairs[0], anchor_pairs[-1]]) 

        wp_transcription = make_path_strictly_monotonic(deepcopy(wp_transcription_weakMonotonic))

        df_annotation_warped_transcription = df_annotation.copy(deep=True)
        df_annotation_warped_transcription["end"] = df_annotation_warped_transcription["start"] + df_annotation_warped_transcription["duration"]
        df_annotation_warped_transcription[['start', 'end']] = scipy.interpolate.interp1d(wp_transcription[1] / feature_rate, 
                                   wp_transcription[0] / feature_rate, kind='linear', bounds_error=False, fill_value="extrapolate")(df_annotation[['start', 'end']])
        
        df_annotation_warped_transcription.end[df_annotation_warped_transcription.end > measuresIn.iloc[-2].time] = measuresIn.iloc[-2].time
        
        df_annotation_warped_transcription["duration"] = df_annotation_warped_transcription["end"] - df_annotation_warped_transcription["start"]
        

        df_annotation_warped_transcription.sort_values(by="start", inplace=True)
        
        df_annotation_warped_transcription.to_csv(os.path.join(note_out_dir, piece+"_"+performer+".csv"), sep=";", index=False,
                                         columns=["start", "end", 
                                                  "start_meas", "end_meas",
                                                  "pitch", "pitchname",
                                                  "timesig", "articulation"],
                                         float_format="%07.3f")

    