In [None]:
# To re-create corpus_data directory
# from harmonic_inference.data.corpus_reading import aggregate_annotation_dfs
# from pathlib import Path

# ANNOTATIONS_PATH = Path('../corpora/annotations')
# OUT_DIR = Path('corpus_data')

# aggregate_annotation_dfs(ANNOTATIONS_PATH, OUT_DIR)

In [None]:
from harmonic_inference.data.corpus_reading import load_clean_corpus_dfs

files_df, measures_df, chords_df, notes_df = load_clean_corpus_dfs('corpus_data')

In [None]:
files_df

In [None]:
notes_df

In [None]:
measures_df

In [None]:
chords_df

In [None]:
"""Training of initial chord prior model"""
import json
from pathlib import Path

import numpy as np

from harmonic_inference.data.piece import Chord
from harmonic_inference.data.data_types import PitchType, KeyMode


initial_chords = chords_df.loc[chords_df.index.get_level_values("chord_id") == 0]
chords = [
    Chord.from_series(row, measures_df.loc[file_id], PitchType.TPC)
    for (file_id, chord_id), row in initial_chords.iterrows()
]

major_key_chords = []
minor_key_chords = []

one_hot_length = chords[0].get_chord_vector_length(
    PitchType.TPC,
    one_hot=True,
    relative=True,
    use_inversions=True,
)
norm_factor = 1 / one_hot_length
major_key_chords_one_hots = np.ones(one_hot_length) * norm_factor
minor_key_chords_one_hots = np.ones(one_hot_length) * norm_factor

for chord in chords:
    one_hot_index = chord.get_one_hot_index(relative=True, use_inversion=True)

    if chord.key_mode == KeyMode.MAJOR:
        major_key_chords.append(chord)
        major_key_chords_one_hots[one_hot_index] += 1
    else:
        minor_key_chords.append(chord)
        minor_key_chords_one_hots[one_hot_index] += 1

# Normalize
major_key_chords_one_hots /= np.sum(major_key_chords_one_hots)
minor_key_chords_one_hots /= np.sum(minor_key_chords_one_hots)

with open(Path("checkpoints", "initial_chord_prior.json"), "w") as json_file:
    json.dump(
        {
            "pitch_type": str(PitchType.TPC).split(".")[1],
            "use_inversions": True,
            "major": list(major_key_chords_one_hots),
            "minor": list(minor_key_chords_one_hots),
        },
        json_file,
        indent=4,
    )

In [None]:
import harmonic_inference.data.datasets as ds

dataset_classes = [ds.ChordTransitionDataset, ds.ChordClassificationDataset]

dataset_splits = ds.get_dataset_splits(
    files_df,
    measures_df,
    chords_df,
    notes_df,
    dataset_classes,
    splits=[0.8, 0.1, 0.1],
    seed=0,
)

In [None]:
# Write data out to h5 files
for i1, data_type in enumerate(dataset_classes):
    for i2, split in enumerate(['train', 'valid', 'test']):
        h5_path = Path('h5_data', f'{data_type.__name__}_{split}_seed_{seed}.h5')
        dataset_splits[i1][i2].to_h5(Path(h5_path))

In [None]:
import eval_utils as eu
import harmonic_utils as hu
import matplotlib.pyplot as plt

label_strings = hu.get_one_hot_labels()
conf_mat = eu.get_conf_mat(labels, outputs)

plt.figure(figsize=(30,30))
plt.imshow(conf_mat, interpolation='none')
plt.colorbar()
plt.xticks(ticks=list(range(len(label_strings))), labels=label_strings, rotation=90, fontsize=10)
plt.yticks(ticks=list(range(len(label_strings))), labels=label_strings, fontsize=10)
plt.show()

In [None]:
import eval_utils as eu

correct, incorrect = eu.get_correct_and_incorrect_indexes(labels, outputs)
print('Correct: ' + str(len(correct)))
print('Incorrect: ' + str(len(incorrect)))

In [None]:
import eval_utils as eu
    
eu.print_result(incorrect[0], labels, outputs, limit=10, prob=False)

In [None]:
import eval_utils as eu

chord, onset_notes, all_notes = eu.get_input_df_rows(incorrect[0], datasets[data]['test'])

print(chord)
print("USED NOTES:")
print(onset_notes)
print()
print("ALL NOTES:")
print(all_notes)

In [None]:
import matplotlib.pyplot as plt
import eval_utils as eu

correct_ranks, indexes_by_rank = eu.get_correct_ranks(labels, outputs)
    
plt.figure(figsize=(30,30))
plt.bar(range(len(outputs[0])), [len(indexes) for indexes in indexes_by_rank])

In [None]:
import eval_utils as eu
import importlib
importlib.reload(eu)

eval_df = eu.get_eval_df(labels, outputs, datasets[data]['test'])
eval_df

In [None]:
import ablation
import importlib
importlib.reload(ablation)

dfs = ablation.load_all_ablated_dfs(directory='results', prefix=prefix[:-1] if len(prefix) > 0 else None)
_, mask_names = ablation.get_masks_and_names()

In [None]:
import pandas as pd
import os

logs = []
for mask_name in mask_names:
    logs.append(pd.read_csv(os.path.join(os.path.join('results', prefix + mask_name + '.log'))))

In [None]:
for df, log, mask_name in zip(dfs, logs, mask_names):
    print(f"{mask_name} Acc: {100 * df.correct.sum() / len(df)}")
    print(log.iloc[-1])

In [None]:
import eval_utils as eu

global_df = eu.load_eval_df('results/global_no_ablation.csv')
local_df = eu.load_eval_df('results/local_no_ablation.csv')
none_df = eu.load_eval_df('results/no_ablation.csv')

In [None]:
global_df

In [None]:
global_counts = global_df.groupby(['correct_chord'])['correct'].agg(['mean', 'count']).sort_values('count', ascending=False)
local_counts = local_df.groupby(['correct_chord'])['correct'].agg(['mean', 'count']).sort_values('count', ascending=False)
none_counts = none_df.groupby(['correct_chord'])['correct'].agg(['mean', 'count']).sort_values('count', ascending=False)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12,5))
plt.scatter(global_counts['count'], global_counts['mean'], color='red', label='Global key')
plt.scatter(local_counts['count'], local_counts['mean'], color='blue', label='Local key')
plt.scatter(none_counts['count'], none_counts['mean'], color='yellow', label='No transposition')
plt.title('Global key transposed')
plt.xlabel('Count')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# Start of baseline evaluation

In [None]:
from glob import glob
from pathlib import Path
from typing import Tuple
from fractions import Fraction
from bisect import bisect

import pandas as pd
import numpy as np

from harmonic_inference.utils import eval_utils as eu
from harmonic_inference.utils import harmonic_utils as hu
from harmonic_inference.data.data_types import ChordType, PitchType, KeyMode, TRIAD_REDUCTION

In [None]:
results = {}

for file in glob("outputs/baseline/*.csv"):
    file_path = Path(file)
    results[file_path.name] = pd.read_csv(file, header=None, names=['on', 'off', 'key', 'degree', 'type', 'inv'])

    # Output is in quarter notes, labels are in whole notes
    results[file_path.name]["on"] /= 4
    results[file_path.name]["off"] /= 4

In [None]:
keys = set()
degrees = set()
types = set()
inversions = set()

for df in results.values():
    for k in df['key'].unique():
        keys.add(k)
    for d in df['degree'].unique():
        degrees.add(d)
    for t in df['type'].unique():
        types.add(t)
    for i in df['inv'].unique():
        inversions.add(i)

In [None]:
def key_to_tonic_mode(key: str, pitch_type: PitchType = PitchType.TPC) -> Tuple[int, KeyMode]:
    key = key.replace('-', 'b')
    key = key.replace('+', '#')
    
    tonic = hu.get_pitch_from_string(key, pitch_type)
    mode = KeyMode.MAJOR if key[0].isupper() else KeyMode.MINOR
    
    return tonic, mode

In [None]:
def type_to_chord_type(type_str: str) -> ChordType:
    return {
        'D7': ChordType.MAJ_MIN7,
        'M': ChordType.MAJOR,
        'd': ChordType.DIMINISHED,
        'd7': ChordType.DIM7,
        'm': ChordType.MINOR,
        'm7': ChordType.MIN_MIN7,
        'Gr+6': ChordType.DIM7,
        'h7': ChordType.HALF_DIM7,
    }[type_str]

In [None]:
def get_root_tonic_and_mode(
    degree_str: str, tonic: int, mode: KeyMode, pitch_type: PitchType = PitchType.TPC
) -> Tuple[int, int, KeyMode]:
    if isinstance(degree_str, int):
        degree_str = str(degree_str)
        
    degree_str = degree_str.replace('-', 'b')
    degree_str = degree_str.replace('+', '#')
    
    if '/' in degree_str:
        key, degree_str = degree_str.split('/')
        
        relative_transposition = hu.get_interval_from_scale_degree(key, False, mode, pitch_type=pitch_type)
        tonic = hu.transpose_pitch(tonic, relative_transposition, pitch_type=pitch_type)
        
        if key in ['5']:
            mode = KeyMode.MAJOR
        elif key in ['7']:
            mode = KeyMode.MINOR
        elif key in ['1']:
            mode = mode
            
    degree_interval = hu.get_interval_from_scale_degree(degree_str, False, mode, pitch_type=pitch_type)
    root = hu.transpose_pitch(tonic, degree_interval, pitch_type=pitch_type)
    
    return root, tonic, mode

In [None]:
def get_all(key: str, degree: str, type_str: str, inv: str) -> Tuple[int, ChordType, int, int, KeyMode]:
    inv = int(inv)
    chord_type = type_to_chord_type(type_str)
    
    tonic, mode = key_to_tonic_mode(key)
    root, tonic, mode = get_root_tonic_and_mode(degree, tonic, mode)
    
    return root, chord_type, inv, tonic, mode

In [None]:
for df in results.values():
    roots = []
    chord_types = []
    invs = []
    tonics = []
    modes = []
    
    for _, row in df.iterrows():
        root, chord_type, inv, tonic, mode = get_all(row['key'], row['degree'], row['type'], row['inv'])
        roots.append(root)
        chord_types.append(chord_type)
        invs.append(inv)
        tonics.append(tonic)
        modes.append(mode)
        
    df["root_tpc"] = roots
    df["chord_type"] = chord_types
    df["inversion"] = invs
    df["tonic"] = tonics
    df["mode"] = modes

In [None]:
def get_label_df(filename: str) -> pd.DataFrame:
    filename = filename[:-21] + "results.tsv"
    file = glob(f'outputs/results-csm-1-kse-75/**/{filename}', recursive=True)[0]
    
    return pd.read_csv(file, sep='\t', index_col=0, converters={'duration': Fraction})

In [None]:
def get_row_at_onset(df, onset):
    index = min(bisect(list(df['off']), float(onset)), len(df) - 1)
    return df.iloc[index]

In [None]:
def evaluate_df(key, df):
    label_df = get_label_df(key)
    chord_accs = []
    triad_accs = []
    seventh_accs = []
    key_accs = []
    full_accs = []
    
    onset = 0
    for _, label_row in label_df.iterrows():
        est_row = get_row_at_onset(df, onset)
        onset += label_row['duration']
        
        tonic_str = label_row['gt_key'].split(':')[0]
        if '/' in tonic_str:
            tonic_str = tonic_str.split('/')[0]
            
        gt_tonic = hu.get_pitch_from_string(tonic_str, pitch_type=PitchType.TPC)
        gt_mode = KeyMode.MAJOR if label_row['gt_key'][0].isupper() else KeyMode.MINOR
        
        gt_chord = label_row['gt_chord']
        gt_inv = int(gt_chord[-1])
        root_str = gt_chord.split(':')[0]
        if '/' in root_str:
            root_str = root_str.split('/')[0]
        gt_root = hu.get_pitch_from_string(root_str, pitch_type=PitchType.TPC)
        gt_chord_type = hu.get_chord_type_from_string(gt_chord.split(':')[1].split(',')[0])
        
        chord_dist = eu.get_chord_distance(
            gt_root,
            gt_chord_type,
            gt_inv,
            est_row['root_tpc'],
            est_row['chord_type'],
            est_row['inversion'],
        )
        chord_accs.append(1 - chord_dist)
        
        triad_dist = eu.get_chord_distance(
            gt_root,
            gt_chord_type,
            0,
            est_row['root_tpc'],
            est_row['chord_type'],
            0,
            reduction=TRIAD_REDUCTION
        )
        triad_accs.append(1 - triad_dist)
        
        seventh_dist = eu.get_chord_distance(
            gt_root,
            gt_chord_type,
            0,
            est_row['root_tpc'],
            est_row['chord_type'],
            0,
        )
        seventh_accs.append(1 - seventh_dist)
        
        key_dist = eu.get_key_distance(
            gt_tonic,
            gt_mode,
            est_row['tonic'],
            est_row['mode'],
        )
        key_accs.append(1 - key_dist)
        
        full_accs.append(1 if chord_dist + key_dist == 0 else 0)
        
    chord_acc = float(np.average(chord_accs, weights=label_df['duration']))
    key_acc = float(np.average(key_accs, weights=label_df['duration']))
    full_acc = float(np.average(full_accs, weights=label_df['duration']))
    triad_acc = float(np.average(triad_accs, weights=label_df['duration']))
    seventh_acc = float(np.average(seventh_accs, weights=label_df['duration']))
    
    return {
        "Triad": triad_acc,
        "Seventh": seventh_acc,
        "Chord": chord_acc,
        "Key": key_acc,
        "Full": full_acc,
    }

In [None]:
results_vals = {}
import re

for key, df in results.items():
    # Beethoven match
    if not re.match(r"[0-9][0-9]-[0-9]_inf", key):
        continue
    
    print(key)
    for acc, val in evaluate_df(key, df).items():
        if acc not in results_vals:
            results_vals[acc] = []
        results_vals[acc].append(val)
        print(f"    {acc}: {val}")

In [None]:
for acc, val_list in results_vals.items():
    print(f"{acc}: {sum(val_list) / len(val_list)}")

In [None]:
from pathlib import Path
from fractions import Fraction

import pandas as pd
from music21.converter import parse

In [None]:
m21_score = parse(Path("../functional-harmony/data/BPS/scores/bps_01_01.mxl"))
m21_score = m21_score.flattenParts()
m21_score = m21_score.stripTies()

In [None]:
for note in m21_score.recurse().notes:
    if note.isChord:
        chord = note
        print("Chord")
        for note in chord.notes:
            print(note.pitch.name, note.pitch.octave, chord.duration.quarterLength, chord.offset, chord.measureNumber, note.tie, chord.tie)
        print("End Chord")
    else:
        print(note.offset
        print(note.pitch.name, note.pitch.octave, note.duration.quarterLength, note.offset, note.measureNumber)

In [None]:
for offset, measure in m21_score.measureOffsetMap().items():
    print(offset, measure[0].timeSignature)

In [None]:
import importlib
from pathlib import Path
import harmonic_inference.data.piece as piece
importlib.reload(piece)

In [None]:
notes, measures_df = piece.get_score_piece_from_music_xml(Path("../functional-harmony/data/BPS/scores/bps_01_01.mxl"), "")

In [None]:
measures_df[40:50]

In [None]:
list(note for note in notes if note.onset[0] in [48, 49])

# Test loading funcional-harmony data

In [None]:
from glob import glob
from pathlib import Path

import harmonic_inference.data.piece as piece

In [None]:
import importlib
importlib.reload(piece)

In [None]:
for file_path in glob("../functional-harmony/data/**/*.mxl", recursive=True):
    music_xml_path = Path(file_path)
    label_csv_path = music_xml_path.parent.parent / "chords" / Path(str(music_xml_path.stem) + ".csv")

    score = piece.get_score_piece_from_music_xml(music_xml_path, label_csv_path)