In [2]:
import numpy as np
from scipy.ndimage import gaussian_filter1d
from scipy.signal import medfilt

from numpy.fft import fft, fftshift, fftfreq
from matplotlib import pyplot as plt
from pathlib import Path
import importlib

In [3]:
import helpers
importlib.reload(helpers)

# Driver
class Ex:
    def __init__(self, name: str, num_notes: int, spec_thresh: float, bpm: int, max_num_notes_per_beat: int):
        self.name = name
        self.num_notes = num_notes
        self.spec_thresh = spec_thresh
        self.bpm = bpm
        self.max_num_notes_per_beat = max_num_notes_per_beat
        self.min_time_between = 60 / (self.bpm * max_num_notes_per_beat)
        self.sustain_thresh_coeff = 0

def getExercises():
    return [
        Ex("ex1WholeMod.mp4", num_notes=90, spec_thresh=.15, bpm=80, max_num_notes_per_beat=2),
        Ex("ex1WholeModF.mp4", num_notes=90, spec_thresh=.15, bpm=120, max_num_notes_per_beat=2),
        Ex("ex2WholeMod.mp4", num_notes=49, spec_thresh=.15, bpm=80, max_num_notes_per_beat=1),
        Ex("ex3WholeMod.mp4", num_notes=145, spec_thresh=.15, bpm=100, max_num_notes_per_beat=2),
        Ex("ex3WholeModF.mp4", num_notes=145, spec_thresh=.15, bpm=130, max_num_notes_per_beat=2),
        Ex("ex4WholeMod.mp4", num_notes=102, spec_thresh=.15, bpm=90, max_num_notes_per_beat=4),
        Ex("ex4WholeModF.mp4", num_notes=102, spec_thresh=.05, bpm=120, max_num_notes_per_beat=2),
        Ex("ex5WholeMod.mp4", num_notes=133, spec_thresh=.15, bpm=64, max_num_notes_per_beat=2),
        Ex("ex5WholeModF.mp4", num_notes=133, spec_thresh=.15, bpm=86, max_num_notes_per_beat=2),
        Ex("ex6WholeMod.mp4", num_notes=118, spec_thresh=.15, bpm=70, max_num_notes_per_beat=4),
        Ex("ex6WholeModF.mp4", num_notes=118, spec_thresh=.15, bpm=90, max_num_notes_per_beat=4),
        Ex("ex7WholeMod.mp4", num_notes=86, spec_thresh=.15, bpm=70, max_num_notes_per_beat=2),
        Ex("ex8WholeMod.mp4", num_notes=112, spec_thresh=.15, bpm=55, max_num_notes_per_beat=4),
        Ex("ex8WholeModF.mp4", num_notes=112, spec_thresh=.15, bpm=80, max_num_notes_per_beat=4),
        Ex("ex9WholeMod.mp4", num_notes=121, spec_thresh=.15, bpm=100, max_num_notes_per_beat=4),
        Ex("ex10WholeMod.mp4", num_notes=102, spec_thresh=.15, bpm=120, max_num_notes_per_beat=3)
    ]

def observe_accuracy(exercises):
    avg_artic_leng = 0

    for ex in exercises:
        exercise = ex.name
        note_count = ex.num_notes
        spec_thresh = ex.spec_thresh
        min_time_between = ex.min_time_between * .8
        sustain_thresh = ex.sustain_thresh_coeff * spec_thresh

        ys, ts, sr = helpers.get_audio_data(f"exercises/{exercise}")
        ts_fr, fr_freq_amps, freq_bins = helpers.magnitude_spectrogram(ys, ts, sr)
        ts_fr, spec_flux = helpers.compute_spectral_flux(ts_fr, fr_freq_amps, sr)

        centroids = helpers.compute_spectral_centroid(time_frames=ts_fr, freq_bins=freq_bins, frame_freq_amps=fr_freq_amps)
        
        plt.title(exercise)
        plt.xlabel("Time (s)"); plt.ylabel("Spectral Flux")
        plt.plot(ts_fr, spec_flux, c="orange")
        img_path = Path.cwd() / "spec_flux_graphs" / f"{exercise} Spectral Flux.png"
        plt.savefig(img_path)
        plt.close()
        
        # onsets = helpers.detect_onsets(spec_flux, ts_fr, threshold=spec_thresh, min_time_between=min_time_between)
        onsets, sustains = helpers.detect_onsets_and_release(spec_flux=spec_flux, times=ts_fr, sr=sr, sustain_thresh=sustain_thresh, onset_thresh=spec_thresh, min_time_between=min_time_between)

        print_num_onset_detection_accuracy = True
        graph_onsets_sustains = True
        
        if print_num_onset_detection_accuracy:
            print(f"For exercise {exercise}, Note count Actual:\t{note_count}, Note count Detected: \t{len(onsets)} | Accuracy: \t{min(note_count / len(onsets), len(onsets) / note_count)}")

        if graph_onsets_sustains:
            artic_lens = [sustains[i] - onsets[i] for i in range(min(len(onsets), len(sustains)))]
            # if np.max(artic_lens) > 1:
            # print(f"For {exercise}, min articulation length: {np.min(artic_lens)}")
            # if np.max(artic_lens) > .1:
            #     print("==============================================================")
            #     print(f"For {exercise}, max articulation length: {np.max(artic_lens)}")
            print(f"For {exercise}, mean articulation length: {np.average(artic_lens)}")
            avg_artic_leng += np.average(artic_lens)

            onset_indic = np.zeros_like(ts_fr)
            sustain_indic = np.zeros_like(ts_fr)

            stem_top = np.max(centroids)

            # Set 1.0 at the closest time points where onsets/sustains occur
            for onset_time in onsets:
                idx = np.argmin(np.abs(ts_fr - onset_time))
                onset_indic[idx] = stem_top
                
            for sustain_time in sustains:
                idx = np.argmin(np.abs(ts_fr - sustain_time))
                sustain_indic[idx] = stem_top

            plt.title(f"{exercise} Onset + Sustain + Spectral Centroid")
            plt.xlim(10, 14)
            plt.xlabel("Times (s)")
            plt.stem(ts_fr, onset_indic, linefmt='--', markerfmt='pink', label='Onsets')
            plt.stem(ts_fr, sustain_indic, linefmt='--', markerfmt='red', label='Sustains')
            plt.legend()
            img_path = Path.cwd() / "centroids" / f"{exercise} centroid.png"
            plt.plot(ts_fr, centroids)
            plt.savefig(img_path)
            plt.close()

    print(f"Average articulation window size is {avg_artic_leng / len(exercises)}")

# observe_accuracy()

In [4]:
importlib.reload(helpers)

def optimize_spec_thresh():
    lowest, highest, num_tests = .01, .25, 100
    spec_thres_vals = np.linspace(lowest, highest, num_tests)
    opt_spec_thresh = [] # the optimized thresholds for each exercise

    exerciseNoteCounts = getExercises()

    for ex in exerciseNoteCounts:
        exercise = ex.name
        note_count = ex.num_notes
        min_time_between = ex.min_time_between * .8

        num_onsets_detected = []
        accuracies = []

        ys, ts, sr = helpers.get_audio_data(f"exercises/{exercise}")
        ts_fr, fr_freq_amps, _ = helpers.magnitude_spectrogram(ys, ts, sr)
        ts_fr, spec_flux = helpers.compute_spectral_flux(ts_fr, fr_freq_amps, sr)

        for spec_thresh in spec_thres_vals:
            onsets = helpers.detect_onsets_only(spec_flux=spec_flux, times=ts_fr, sr=sr, onset_thresh=spec_thresh, min_time_between=min_time_between)
            accuracy = min(note_count / len(onsets), len(onsets) / note_count)

            num_onsets_detected.append(len(onsets))
            accuracies.append(accuracy)
        
        plt.title(f"{exercise} Spectral Flux Threshold vs Number of Onsets Detected")
        plt.xlabel("Spectral Flux Threshold"); plt.ylabel("Number of Onsets Detected")
        plt.plot(spec_thres_vals, num_onsets_detected, label="Threshold vs Onsets")
        plt.axhline(y=note_count, color='limegreen', linestyle='--', linewidth=2, label="Correct # Onsets")
        plt.legend()
        img_path = Path.cwd() / "spec_thresh_opt" / "graphs" / f"{exercise}.png"
        plt.savefig(img_path)
        plt.close()

        opt_perf = max(accuracies)
        opt_perf_spec_thresh = [spec_thres_vals[i] for i, val in enumerate(accuracies) if val == opt_perf]
        with open("spec_thresh_opt/optimized_values.txt", "a") as f:
            f.write(f"{'=' * 30}\n")
            f.write(f"Best performing spectral thresholds at {opt_perf * 100}% accuracy for {exercise}:\n")
            f.write(f"{opt_perf_spec_thresh}\n\n")

        opt_spec_thresh.append(opt_perf_spec_thresh[len(opt_perf_spec_thresh) // 2]) # choose the middle most successful output to maximize applicability
        
    minimum_thresh = .08 # optimizing for the best threshold for this case hurts sustain onset detection, so this ensures a decent middle ground.
    for i, ex in enumerate(exerciseNoteCounts):
        ex.spec_thresh = max(opt_spec_thresh[i], minimum_thresh)

    return exerciseNoteCounts

opt_exercises = optimize_spec_thresh()
for ex in opt_exercises:
    print(f"for exercise {ex.name}, note onset spectral threshold is {ex.spec_thresh}")
print(opt_exercises)

for exercise ex1WholeMod.mp4, note onset spectral threshold is 0.11666666666666665
for exercise ex1WholeModF.mp4, note onset spectral threshold is 0.21606060606060606
for exercise ex2WholeMod.mp4, note onset spectral threshold is 0.14333333333333334
for exercise ex3WholeMod.mp4, note onset spectral threshold is 0.1578787878787879
for exercise ex3WholeModF.mp4, note onset spectral threshold is 0.10454545454545454
for exercise ex4WholeMod.mp4, note onset spectral threshold is 0.19424242424242424
for exercise ex4WholeModF.mp4, note onset spectral threshold is 0.08757575757575757
for exercise ex5WholeMod.mp4, note onset spectral threshold is 0.14333333333333334
for exercise ex5WholeModF.mp4, note onset spectral threshold is 0.1384848484848485
for exercise ex6WholeMod.mp4, note onset spectral threshold is 0.11424242424242424
for exercise ex6WholeModF.mp4, note onset spectral threshold is 0.14333333333333334
for exercise ex7WholeMod.mp4, note onset spectral threshold is 0.14575757575757575
f

In [5]:
importlib.reload(helpers)

def optimize_sustain_thresh_coeff(exercises):
    # lowest, highest, num_tests = -.01, .85, 100
    lowest, highest, num_tests = 0, 1.5, 100
    sustain_thresh_coeffs = np.linspace(lowest, highest, num_tests)
    opt_coeffs = [] # the optimized coefficients for each exercise

    for ex in exercises:
        exercise = ex.name
        note_count = ex.num_notes
        spec_thresh = ex.spec_thresh
        min_time_between = ex.min_time_between * .8

        sustain_saturations = []

        ys, ts, sr = helpers.get_audio_data(f"exercises/{exercise}")
        ts_fr, fr_freq_amps, _ = helpers.magnitude_spectrogram(ys, ts, sr)
        ts_fr, spec_flux = helpers.compute_spectral_flux(ts_fr, fr_freq_amps, sr)

        for coeff in sustain_thresh_coeffs:
            sustain_thresh = spec_thresh * coeff
            _, sustains = helpers.detect_onsets_and_release(spec_flux=spec_flux, times=ts_fr, sr=sr, onset_thresh=spec_thresh, sustain_thresh=sustain_thresh, min_time_between=min_time_between)
            sustain_saturations.append(len(sustains) / note_count)

        plt.title(f"{exercise} Coefficient for Sustain vs Onset Pairing Completeness")
        plt.xlabel("Coefficient for Sustain"); plt.ylabel("Onset Pairing Completeness")
        plt.plot(sustain_thresh_coeffs, sustain_saturations)
        img_path = Path.cwd() / "sustain_thresh_opt" / "graphs" / f"{exercise}.png"
        plt.savefig(img_path)
        plt.close()
    
        opt_perf_sustain_coeff = [sustain_thresh_coeffs[i] for i, val in enumerate(sustain_saturations) if val == max(sustain_saturations)]
        
        with open("sustain_thresh_opt/optimized_values.txt", "a") as f:
            f.write(f"{'=' * 30}\n")
            f.write(f"Minimum coefficient for complete onset + sustain pairing for {exercise}:\n")
            f.write(f"{np.min(opt_perf_sustain_coeff)}\n\n")

        opt_coeffs.append(np.min(opt_perf_sustain_coeff) * 1.1) # choose the minimum most successful output which is closest to ideal best

    for i, ex in enumerate(exercises):
        ex.sustain_thresh_coeff = opt_coeffs[i]
        
    return exercises

opt_exercises = optimize_sustain_thresh_coeff(opt_exercises)

observe_accuracy(opt_exercises)

For exercise ex1WholeMod.mp4, Note count Actual:	90, Note count Detected: 	90 | Accuracy: 	1.0
For ex1WholeMod.mp4, mean articulation length: 0.043111111111111294
For exercise ex1WholeModF.mp4, Note count Actual:	90, Note count Detected: 	90 | Accuracy: 	1.0
For ex1WholeModF.mp4, mean articulation length: 0.03333333333333348
For exercise ex2WholeMod.mp4, Note count Actual:	49, Note count Detected: 	49 | Accuracy: 	1.0
For ex2WholeMod.mp4, mean articulation length: 0.03877551020408123
For exercise ex3WholeMod.mp4, Note count Actual:	145, Note count Detected: 	145 | Accuracy: 	1.0
For ex3WholeMod.mp4, mean articulation length: 0.038482758620689596
For exercise ex3WholeModF.mp4, Note count Actual:	145, Note count Detected: 	145 | Accuracy: 	1.0
For ex3WholeModF.mp4, mean articulation length: 0.03806896551724144
For exercise ex4WholeMod.mp4, Note count Actual:	102, Note count Detected: 	102 | Accuracy: 	1.0
For ex4WholeMod.mp4, mean articulation length: 0.03754901960784308
For exercise ex4

In [6]:
with open("opt_results.txt", "w") as f: 
    f.write("FINAL OPTIMIZED PARAMETERS FOR EACH EXERCISE\n\n")
    for ex in opt_exercises:
        f.write(f"{'=' * 50}\n")
        f.write(f"Name:                             {ex.name:>15}\n")
        f.write(f"Note Count:                       {ex.num_notes:>15d}\n")
        f.write(f"Spectral Threshold:               {ex.spec_thresh:>15.4f}\n")
        f.write(f"BPM:                              {ex.bpm:>15d}\n")
        f.write(f"Max Notes Per Beat:               {ex.max_num_notes_per_beat:>15d}\n")
        f.write(f"Min Time Between Notes:           {ex.min_time_between:>15.4f} seconds\n")
        f.write(f"Sustain Threshold Coefficient:    {ex.sustain_thresh_coeff:>15.4f}\n")
        f.write(f"Sustain Threshold Value:          {ex.spec_thresh * ex.sustain_thresh_coeff:>15.4f}\n\n")

In [8]:
importlib.reload(helpers)

from bisect import bisect_left
import os, json

def pack_articulations(data, out_dir):
    os.makedirs(out_dir, exist_ok=True)

    amps_path  = os.path.join(out_dir, "pack.amps.f32")
    cents_path = os.path.join(out_dir, "pack.cents.f32")

    f_amps  = open(amps_path,  "wb")
    f_cents = open(cents_path, "wb")

    manifest = {
        "amps":  { "file": "pack.amps.f32",  "dtype": "float32" },
        "cents": { "file": "pack.cents.f32", "dtype": "float32" },
        "exercises": []
    }

    amp_off = 0   # element offsets (float32)
    cent_off = 0

    for ex_idx, ex in enumerate(data):
        try:
            onsets, sustains, amps_list, cents_list = ex
        except Exception as e:
            raise ValueError(f"Exercise {ex_idx} must be [onsets, sustains, amps, centroids]") from e

        nA = len(onsets)
        if not (len(sustains) == len(amps_list) == len(cents_list) == nA):
            raise ValueError(
                f"Exercise {ex_idx}: length mismatch "
                f"(onsets={len(onsets)}, sustains={len(sustains)}, "
                f"amps={len(amps_list)}, cents={len(cents_list)})"
            )

        ex_entry = { "id": f"ex{ex_idx}", "articulations": [] }

        for a_idx in range(nA):
            on_t = float(onsets[a_idx])
            sus_t = float(sustains[a_idx])

            # Convert series to little-endian float32 and append
            amp_arr  = np.asarray(amps_list[a_idx],  dtype=np.float32).astype('<f4', copy=False)
            cent_arr = np.asarray(cents_list[a_idx], dtype=np.float32).astype('<f4', copy=False)

            # Write raw bytes
            amp_arr.tofile(f_amps)
            cent_arr.tofile(f_cents)

            a_entry = {
                "id": f"ex{ex_idx}_a{a_idx:03d}",
                "onset_time":   on_t, 
                "sustain_time": sus_t, 
                "amp_off":  int(amp_off),
                "amp_len":  int(amp_arr.size),
                "cent_off": int(cent_off),
                "cent_len": int(cent_arr.size),
            }

            amp_off  += amp_arr.size
            cent_off += cent_arr.size

            ex_entry["articulations"].append(a_entry)

        manifest["exercises"].append(ex_entry)

    f_amps.close()
    f_cents.close()

    with open(os.path.join(out_dir, "pack.json"), "w", encoding="utf-8") as f:
        json.dump(manifest, f, indent=2)

def create_bin(exercises):
    # AB project graph length in pixels (for efficient compression)
    graph_size = 2 ** 10

    # format: a list for each exercise, each element is [[onsets], [sustains], [amps], [centroids]]
    # onsets is a list of onsets
    # sustains is a list of sustains
    # amps is a list of list of amplitudes corresponding to each onset/sustain
    # centroids is a list of list of centroids corresponding to each onset/sustain
    data = []
    for ex in exercises:
        exercise = ex.name
        spec_thresh = ex.spec_thresh
        min_time_between = ex.min_time_between * .8
        sustain_thresh = ex.sustain_thresh_coeff * spec_thresh

        ys, ts, sr = helpers.get_audio_data(f"exercises/{exercise}")
        ts_fr, fr_freq_amps, freq_bins = helpers.magnitude_spectrogram(ys, ts, sr)
        ts_fr, spec_flux = helpers.compute_spectral_flux(ts_fr, fr_freq_amps, sr)

        centroids = helpers.compute_spectral_centroid(time_frames=ts_fr, freq_bins=freq_bins, frame_freq_amps=fr_freq_amps)        
        onsets, sustains = helpers.detect_onsets_and_release(spec_flux=spec_flux, times=ts_fr, sr=sr, sustain_thresh=sustain_thresh, onset_thresh=spec_thresh, min_time_between=min_time_between)

        data_onsets, data_sustains, data_smoothed_amps, data_centroids = [], [], [], []
        for i in range(min(len(onsets), len(sustains))):
            onset_i = bisect_left(ts, onsets[i])
            sustain_i = bisect_left(ts, sustains[i])
            amps_window = np.linspace(onset_i, sustain_i, num=graph_size, dtype=int)
            # smoothed amplitudes in this window
            smooth_window_size = 15
            smoothed_amps = np.convolve(ys[amps_window], np.ones(smooth_window_size) / smooth_window_size, mode='same')


            onset_i_ts_fr = bisect_left(ts_fr, onsets[i])
            sustain_i_ts_fr = bisect_left(ts_fr, sustains[i])
            centroids_window = np.linspace(onset_i_ts_fr, sustain_i_ts_fr, num=graph_size, dtype=int)
            centrds = centroids[centroids_window]

            data_onsets.append(ts[onset_i])
            data_sustains.append(ts[sustain_i])
            data_smoothed_amps.append(smoothed_amps)
            data_centroids.append(centrds)
        data.append([data_onsets, data_sustains, data_smoothed_amps, data_centroids])
    
    pack_articulations(data, "precomputed_target_bin")

create_bin(opt_exercises)