In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import glob
import os.path
import pandas as pd
import pickle
from plotnine import *
from pandas.api.types import CategoricalDtype

In [None]:
unit_tests_path = Path('../ttmp/unit_tests')
unit_tests_path.mkdir(parents=True, exist_ok=True)

In [None]:
def save_annot_files(benchmark, folder_name, annot1_df, annot2_df):
    (unit_tests_path / benchmark).mkdir(exist_ok=True)
    annot1_path = unit_tests_path / benchmark / f'{folder_name}/{folder_name}_annot1.beat'
    annot1_path.parent.mkdir(parents=True, exist_ok=True)
    with open(annot1_path, 'w') as f:
        f.write('%\n')
        f.write('%\n')
        f.write('% start_time[sec]	end_time[sec]	label\n')
        annots = annot1_df.to_csv(None, header=False, index=False, sep='\t')
        f.writelines(annots)
        
    annot2_path = unit_tests_path / benchmark / f'{folder_name}/{folder_name}_annot2.beat'
    annot2_path.parent.mkdir(exist_ok=True)
    with open(annot2_path, 'w') as f:
        f.write('%\n')
        f.write('%\n')
        f.write('% start_time[sec]	end_time[sec]	label\n')
        annots = annot2_df.to_csv(None, header=False, index=False, sep='\t')
        f.writelines(annots)

In [None]:
def save_wp(benchmark, folder_name, end_second, hop_sec = 512/22050):
    path = np.arange(0, (end_second / hop_sec)+1)
    wp = np.array([path] * 2)
    with open(unit_tests_path / benchmark / folder_name / f'{folder_name}_annot1__{folder_name}_annot2.pkl', 'wb') as f:
        pickle.dump(wp, f)

## Make Files

### Exact Match

**Matching**

In [None]:
# Make two identical annot files with start,end seconds [1,2], [2,3], ..., [100,101]
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = annot1_df

In [None]:
save_annot_files('matching', 'exact_match', annot1_df, annot2_df)

In [None]:
save_wp('matching', 'exact_match', 100)

**Subseq10**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = annot1_df[10:20]

In [None]:
save_annot_files('subseq10', 'exact_match', annot1_df, annot2_df)

In [None]:
save_wp('subseq10', 'exact_match', 100)

In [None]:
# No Path?

**PartialOverlap**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot1_df = annot_df[:60]
annot2_df = pd.DataFrame(np.array([np.arange(0,101-41)] + [np.arange(1,102-41)] + [np.array(annot_df[2][-60:])]).T)

In [None]:
save_annot_files('partialOverlap', 'exact_match', annot1_df, annot2_df)

In [None]:
save_wp('partialOverlap', 'exact_match', 100)

**Pre5**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = pd.DataFrame(np.array([np.arange(1+5,101+5)] + [np.arange(2+5,102+5)] + [np.array(measures[:100])]).T)

In [None]:
save_annot_files('pre5', 'exact_match', annot1_df, annot2_df)

In [None]:
save_wp('pre5', 'exact_match', 100)

**Post5**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = annot1_df

In [None]:
save_annot_files('post5', 'exact_match', annot1_df, annot2_df)

In [None]:
save_wp('post5', 'exact_match', 100)

**PrePost5**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = pd.DataFrame(np.array([np.arange(1+5,101+5)] + [np.arange(2+5,102+5)] + [np.array(measures[:100])]).T)

In [None]:
save_annot_files('prepost5', 'exact_match', annot1_df, annot2_df)

In [None]:
save_wp('prepost5', 'exact_match', 100)

### Shifted by 2 sec

**Matching**

In [None]:
# Make two annot files where one's start seconds is shifted by 2
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = pd.DataFrame(np.array([np.arange(3,103)] + [np.arange(4,104)] + [np.array(measures[:100])]).T)

In [None]:
save_annot_files('matching', 'shifted_2', annot1_df, annot2_df)

In [None]:
save_wp('matching', 'shifted_2',100)

**Subseq10**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = pd.DataFrame(np.array([np.arange(3,103)] + [np.arange(4,104)] + [np.array(measures[:100])]).T)
annot2_df = annot2_df[10:20]

In [None]:
save_annot_files('subseq10', 'shifted_2', annot1_df, annot2_df)

In [None]:
save_wp('subseq10', 'shifted_2',100)

In [None]:
# No Path?

**PartialOverlap**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot1_df = annot_df[:60]
annot2_df = pd.DataFrame(np.array([np.arange(2,101-41+2)] + [np.arange(3,102-41+2)] + [np.array(annot_df[2][-60:])]).T)

In [None]:
save_annot_files('partialOverlap', 'shifted_2', annot1_df, annot2_df)

In [None]:
save_wp('partialOverlap', 'shifted_2',100)

**Pre5**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = pd.DataFrame(np.array([np.arange(3+5,103+5)] + [np.arange(4+5,104+5)] + [np.array(measures[:100])]).T)

In [None]:
save_annot_files('pre5', 'shifted_2', annot1_df, annot2_df)

In [None]:
save_wp('pre5', 'shifted_2',100)

**Post5**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = pd.DataFrame(np.array([np.arange(3,103)] + [np.arange(4,104)] + [np.array(measures[:100])]).T)

In [None]:
save_annot_files('post5', 'shifted_2', annot1_df, annot2_df)

In [None]:
save_wp('post5', 'shifted_2',100)

**PrePost5**

In [None]:
measures = []
for i in range(100//3+1):
    measures.append(str(i)+'-1')
    measures.append(str(i)+'-2')
    measures.append(str(i)+'-3')
annot1_df = pd.DataFrame(np.array([np.arange(1,101)] + [np.arange(2,102)] + [np.array(measures[:100])]).T)
annot2_df = pd.DataFrame(np.array([np.arange(3+5,103+5)] + [np.arange(4+5,104+5)] + [np.array(measures[:100])]).T)

In [None]:
save_annot_files('prepost5', 'shifted_2', annot1_df, annot2_df)

In [None]:
save_wp('prepost5', 'shifted_2',100)

## Unit Tests

In [None]:
def eval_file(hypfile, annot_root_1, annot_root_2, hop_sec):
    parts = os.path.basename(hypfile).split('__')
    assert len(parts) == 2
    piece = extractPieceName(parts[0])
    annotfile1 = (annot_root_1 / piece / parts[0]).with_suffix('.beat')
    annotfile2 = (annot_root_2 / piece / parts[1]).with_suffix('.beat')
    
    # if groundtruth annotation files are empty, skip this hypothesis file
    try:
        gt1, gt2 = getTimestamps(annotfile1, annotfile2)
        hypalign = loadAlignment(hypfile) # warping path in frames
    except:
        print(f'Skipping hypothesis file {hypfile}')
        return None

    if hypalign is None:
        err = np.array(np.ones(gt1.shape) * np.inf)
    else:
        pred2 = np.interp(gt1, hypalign[0,:]*hop_sec, hypalign[1,:]*hop_sec)
        err = pred2 - gt2
    return err

In [None]:
def extractPieceName(fullpath):
    basename = os.path.basename(fullpath) # e.g. Chopin_Op068No3_Sztompka-1959_pid9170b-21
    parts = basename.split('_')
    piece = '_'.join(parts[0:2]) # e.g. Chopin_Op068No3
    return piece

In [None]:
def getTimestamps(annotfile1, annotfile2):
    df1 = pd.read_csv(annotfile1, header=None, sep='\s+', skiprows=3) 
    df2 = pd.read_csv(annotfile2, header=None, sep='\s+', skiprows=3)

    df_merged = pd.merge(df1, df2, on=[2], how='inner')
    df_merged = df_merged[df_merged[2].astype(str).str.match(".*\d$")]

    return df_merged['0_x'], df_merged['0_y']

In [None]:
def loadAlignment(hypfile):
    with open(hypfile, 'rb') as f:
        d = pickle.load(f)
    return d

In [None]:
def test_exact_match(hypalign, benchmark, hop_sec=512/22050):
    errs = eval_file(hypalign, unit_tests_path / benchmark, unit_tests_path / benchmark, hop_sec)
    
    if benchmark == 'matching' or benchmark == 'subseq10' or benchmark == 'post5':
        assert np.all(errs == 0)
    elif benchmark == 'partialOverlap':
        assert np.all(np.abs(errs) == 41)
    elif benchmark == 'pre5' or benchmark == 'prepost5':
        assert np.all(np.abs(errs) == 5)

In [None]:
def test_shifted(hypalign, benchmark, shift_sec, hop_sec=512/22050):
    errs = eval_file(hypalign, unit_tests_path / benchmark, unit_tests_path / benchmark, hop_sec)
    
    if benchmark == 'matching' or benchmark == 'subseq10' or benchmark == 'post5':
        assert np.all(np.abs(errs) == shift_sec)
    elif benchmark == 'partialOverlap':
        assert np.all(np.abs(errs) == 41-2) or np.all(np.abs(errs) == 41+2)
    elif benchmark == 'pre5' or benchmark == 'prepost5':
        assert np.all(np.abs(errs) == 5-2) or np.all(np.abs(errs) == 5+2)

**Matching**

In [None]:
benchmark = 'matching'

In [None]:
wp_exact_match = Path(unit_tests_path / 'matching' / 'exact_match' / 'exact_match_annot1__exact_match_annot2.pkl')
wp_shifted_2 = Path(unit_tests_path / 'matching' /'shifted_2' / 'shifted_2_annot1__shifted_2_annot2.pkl')

In [None]:
test_exact_match(wp_exact_match, benchmark)

In [None]:
test_shifted(wp_shifted_2, benchmark, 2)

**Subseq10**

In [None]:
benchmark = 'subseq10'

In [None]:
wp_exact_match = Path(unit_tests_path / 'matching' / 'exact_match' / 'exact_match_annot1__exact_match_annot2.pkl')
wp_shifted_2 = Path(unit_tests_path / 'matching' /'shifted_2' / 'shifted_2_annot1__shifted_2_annot2.pkl')

In [None]:
test_exact_match(wp_exact_match, benchmark)

In [None]:
test_shifted(wp_shifted_2, benchmark, 2)

**PartialOverlap**

In [None]:
benchmark = 'partialOverlap'

In [None]:
wp_exact_match = Path(unit_tests_path / benchmark / 'exact_match' / 'exact_match_annot1__exact_match_annot2.pkl')
wp_shifted_2 = Path(unit_tests_path / benchmark /'shifted_2' / 'shifted_2_annot1__shifted_2_annot2.pkl')

In [None]:
test_exact_match(wp_exact_match, benchmark)

In [None]:
test_shifted(wp_shifted_2, benchmark, 2)

**Pre5**

In [None]:
benchmark = 'pre5'

In [None]:
wp_exact_match = Path(unit_tests_path / benchmark / 'exact_match' / 'exact_match_annot1__exact_match_annot2.pkl')
wp_shifted_2 = Path(unit_tests_path / benchmark /'shifted_2' / 'shifted_2_annot1__shifted_2_annot2.pkl')

In [None]:
test_exact_match(wp_exact_match, benchmark)

In [None]:
test_shifted(wp_shifted_2, benchmark, 2)

**Post5**

In [None]:
benchmark = 'post5'

In [None]:
wp_exact_match = Path(unit_tests_path / benchmark / 'exact_match' / 'exact_match_annot1__exact_match_annot2.pkl')
wp_shifted_2 = Path(unit_tests_path / benchmark /'shifted_2' / 'shifted_2_annot1__shifted_2_annot2.pkl')

In [None]:
test_exact_match(wp_exact_match, benchmark)

In [None]:
test_shifted(wp_shifted_2, benchmark, 2)

**PrePost5**

In [None]:
benchmark = 'prepost5'

In [None]:
wp_exact_match = Path(unit_tests_path / benchmark / 'exact_match' / 'exact_match_annot1__exact_match_annot2.pkl')
wp_shifted_2 = Path(unit_tests_path / benchmark /'shifted_2' / 'shifted_2_annot1__shifted_2_annot2.pkl')

In [None]:
test_exact_match(wp_exact_match, benchmark)

In [None]:
test_shifted(wp_shifted_2, benchmark, 2)