In [1]:
%load_ext autoreload
%autoreload 2

In [None]:
import sys
sys.path.append('../..') # leave the notebooks folder into root as the main

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from dtw import *
import scipy
import librosa

from app.core.audio.AudioData import AudioData
from app.core.audio.AudioPlayer import AudioPlayer
from app.core.midi.MidiData import MidiData
from app.core.midi.MidiPlayer import MidiPlayer
from app.core.midi.MidiSynth import MidiSynth

from app.algorithms.pitch.PYin import PYin
from app.config import AppConfig
from app.core.recording.Pitch import Pitch, PitchConfig

Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



In [3]:
AUDIO_FILEPATH = '../../app/resources/audio/user_fugue2.mp3'
MIN_VIOLIN_FREQ = 196.0
SAMPLE_RATE = 44100

user_audio_data = AudioData()
user_audio_data.load_data(AUDIO_FILEPATH)
pyinner = PYin()
pitches = pyinner.pyin(user_audio_data.data)

Processing frame 4589/4589
Done!


In [4]:
# Create a synth with a soundfont
SOUNDFONT_FILEPATH = '../../app/resources/MuseScore_General.sf3'
midi_synth = MidiSynth(SOUNDFONT_FILEPATH)

# Load the midi file into a MidiData object
MIDI_FILEPATH = '../../app/resources/midi/fugue.mid'
midi_data = MidiData(MIDI_FILEPATH)

# Create MidiSynth/Player objects
midi_player = MidiPlayer(midi_synth)
midi_player.load_midi(midi_data)

# Also synthesize + convert MIDI to audio (for later CQT extraction)
midi_audio_data = AudioData()
midi_audio_data.load_midi_file(MIDI_FILEPATH, SOUNDFONT_FILEPATH)

# midi_player.play(start_time=0) # Play the MIDI

Loading MidiSynth...
Synth + soundfont loaded.


## Flow final implementation extravaganza

### data structures
- create pitchdict, getPitch
- create Note, UserString, MidiString
- create EditOperation

### edit algorithm
- refine overall flow
- define insertion/deletion/replacement cost (in terms of how we prioritize)
- create generous-logic

In [None]:
from dataclasses import dataclass
from math import ceil

class Note:
    def __init__(self, pitch: float, start: float, end: float, volume: float=None):
        self.pitch = pitch
        self.start = start
        self.end = end
        self.volume = volume

class NoteString:
    def __init__(self):
        self.notes: list = []

    def append_note(self, note: Note):
        self.notes.append(note)

    def insert_note(self, note: Note, index: int):
        self.notes.insert(index, note)

    def get_note(self, note_index: int):
        return self.notes[note_index]

class UserString(NoteString):
    def __init__(self):
        """handles storing multiple note estimtes of different rank"""
        super().__init__()
        self.notes: list[list[Note]] = []

    def append_note(self, notes: list[Note]):
        self.notes.append(notes)

    def insert_note(self, notes: list[Note], index: int):
        self.notes.insert(index, notes)

    def get_note(self, note_index: int, rank: int=None):
        if not rank:
            return self.notes[note_index]
        return self.notes[note_index][rank]

    def get_distance(self, note_string2: 'NoteString', note_i1: int, note_i2: int):
        """returns the minimum distance between the user notes and the midi"""
        # note distance computations in here...
        note_1 = self.get_note(note_i1) # returns the list
        note_2 = note_string2.get_note(note_i2)

        return min(abs(n_1i.pitch - note_2.pitch) for n_1i in note_1)

class MidiString(NoteString):
    def __init__(self, midi_data: MidiData):
        """midi specific string operations"""
        super().__init__()
        self.load_midi(midi_data)

    def load_midi(self, midi_data: MidiData):
        """Load a midi_data and convert into a NoteString object for string editing"""
        for i, note in midi_data.pitch_df.iterrows():
            new_note = Note(
                pitch=note['pitch'],
                start=note['start'],
                end=note['start'] + note['duration'],
                volume=note['velocity']
            )
            self.append_note(new_note)

class NoteDetector:
    def __init__(self, recording: 'Recording'):
        self.recording = recording

    def get_window_pitch(self, window, method='median') -> float:
        """returns the pitch of a window"""
        return np.median(window)

    def get_window_slope(self, window, start_time, end_time) -> tuple[float, float]:
        """computes the slope of a window of pitches with least squares regression"""
        x = np.linspace(0, end_time-start_time, len(window))

        # get the slope + intercept
        A = np.vstack([x, np.ones_like(x)]).T
        slope, intercept = np.linalg.lstsq(A, window)[0]

        # now use it to reconstruct what the pitches should be
        predicted_window = slope*x + intercept

        # and remove all pitches way off from the prediction
        VARIANCE_THRESHOLD = 3
        residuals = np.abs(window - predicted_window)
        window_2 = np.where(residuals < VARIANCE_THRESHOLD, window, np.nan)
        window_2 = window_2[~np.isnan(window_2)]

        # again! slope + intercept for real this time 😎
        A = np.vstack([x, np.ones_like(x)]).T
        slope, intercept = np.linalg.lstsq(A, window)[0]
        return slope, intercept
        


    def detect_notes(self, start_time: float=0, end_time: float=None, rank: int=0, w: int=30, pitch_thresh: float=0.6, slope_thresh: float=1):
        """
        Detect different-enough pitches (midi_numbers) with a rolling median on a window, size
        = [w]. Compare the current median window pitch to the next one, and keep 
        track of when the difference exceeds [threshold].

        Args:
            start_time: start time (sec) in recording to detect pitches from
            end_time: when to stop detecting pitches, None defaults to end
            rank: probability path to detect, 0=most probable, 3=least probable
            w: size of each window for comparisons (median, slope)
            pitch_thresh: min pitch median difference to count the window as a new note
            slope_thresh: min slope flatness to count a collection of pitches as a 'note'
        """

        note_string = UserString()
        pitches = self.recording.get_pitches(start_time=start_time, end_time=end_time, rank=rank)

        HOP_SIZE = int(w/2)

        # === Finding first note ===
        # keep going until we find a flat enough slope to call it
        # the beginning of our first note

        i_0 = 0 # index of our first note
        pitch_0 = None
        for i in range(0, len(pitches) - w-1, HOP_SIZE):
            window = np.array([p.midi_num for p in pitches[i : i+w]])

            # compute median + slope
            pitch = self.get_window_pitch(window)
            slope, intercept = self.get_window_slope(
                window, 
                start_time=pitches[i].time, 
                end_time=pitches[i+w].time
            )

            if abs(slope) < slope_thresh:
                # found our first note, break
                i_0 = i
                pitch_0 = pitch
                break

        # === FILLING IN NOTE_STRING ===
        # iteration variables
        last_note_start = i_0
        last_note_pitch = pitch_0

        for i in range(i_0, len(pitches) - w-1, HOP_SIZE):
            # get current window
            window = np.array([p.midi_num for p in pitches[i : i+w]])

            # compute median + slope
            pitch = self.get_window_pitch(window)
            slope, intercept = self.get_window_slope(
                window, 
                start_time=pitches[i].time, 
                end_time=pitches[i+w].time
            )

            print(f"time = {pitches[i].time:.2f} | pitch = {pitch}, slope = {slope}")

            # ignore the section if it's not flat enough
            if abs(slope) > slope_thresh:
                continue
            
            # check if it's a significantly different pitch than the previous
            if abs(pitch - last_note_pitch) > pitch_thresh:
                # get median volume
                volumes = [p.volume for p in pitches[last_note_start : i+w]]
                med_volume = np.median(volumes)
                
                # create new note with data and append to note_string
                new_note = Note(
                    pitch=last_note_pitch,
                    start=pitches[last_note_start].time, 
                    end=pitches[i].time,
                    volume=med_volume
                )

                # add a bunch of note possibilities
                N_RANKS = 3
                new_notes = [new_note]
                for j in range(1, N_RANKS):
                    pitches_i = self.recording.get_pitches(
                        start_time=pitches[last_note_start].time, 
                        end_time=pitches[i].time,
                        rank=j
                    )
                    median_pitch = np.median([p.midi_num for p in pitches_i])
                    note_i = Note(
                        pitch=median_pitch,
                        start=pitches[last_note_start].time, 
                        end=pitches[i].time,
                        volume=med_volume
                    )
                    new_notes.append(note_i)

                note_string.append_note(new_notes)

                last_note_pitch = pitch
                last_note_start = i # update the start of our new note

        return note_string


# data structures
class Recording:
    def __init__(self):
        self.audio_data: AudioData = AudioData()
        self.pyinner = PYin(sr=44100, f0_min=196, f0_max=5000, tuning=440)
        self.pitches = None
        self.note_detector = NoteDetector(self)

    def load_audio(self, audio_filepath: str=None):
        """Load the audio from a filepath and create AudioData for it"""
        self.audio_data.load_data(audio_filepath)

    def detect_pitches(self):
        """Detect pitches for the audio in 'audio_data' using PYIN algorithm"""
        self.pitches = self.pyinner.pyin(self.audio_data.data)

    def get_pitch(self, time: float=None, rank: int=0):
        """
        Get a pitch from the user's pitches, based on the closest time 
        to the one provided. Returns an error if the rank (0 > 1 > 2 > ... probability of pitch)
        is invalid.
        
        Args:
            time: The time of the pitch you want to query
            rank: 0 for most probable, 1 for second most probable, etc.
        """
        sample_idx = int(time * self.audio_data.sample_rate)
        pitch_idx = round(sample_idx / self.pyinner.HOP_SIZE)
        return self.pitches[pitch_idx][rank]
    
    def get_pitches(self, start_time: float, end_time: float=None, rank: int=0):
        """
        Gets all the pitches from the given start to the end time.
        Tries to get the rank of the pitch provided, but defaults to what's available.
        (ie, not all times may have multiple pitch estimates)
        """
        if not end_time:
            end_time = len(self.pitches) * self.pyinner.HOP_SIZE / self.audio_data.sample_rate
        
        start_idx = round(start_time * self.audio_data.sample_rate / self.pyinner.HOP_SIZE)
        end_idx = round(end_time * self.audio_data.sample_rate / self.pyinner.HOP_SIZE)

        # clamp
        start_idx = max(0, min(start_idx, len(self.pitches) - 1))
        end_idx = max(0, min(end_idx, len(self.pitches) - 1))

        pitches = self.pitches[start_idx:end_idx]
        pitches = [
            next((p[max(rank-k, 0)] for k in range(rank + 1) if len(p) > max(rank-k, 0)), None)
            for i, p in enumerate(pitches) if p
        ]
        return pitches
    

In [22]:
# Test out the 'Recording' flow
AUDIO_FILEPATH = '../../app/resources/audio/user_fugue2.mp3'

user_recording = Recording()
user_recording.load_audio(AUDIO_FILEPATH)
user_recording.detect_pitches()

Processing frame 4589/4589
Done!


In [26]:
p = user_recording.get_pitches(start_time=0)
note_string = user_recording.note_detector.detect_notes(start_time=0, end_time=None, rank=0, w=14, pitch_thresh=0.6, slope_thresh=5)

time = 0.00 | pitch = 62.07711298332481, slope = 1.5924847585236155
time = 0.02 | pitch = 62.09607847075252, slope = 0.6034752025583839
time = 0.04 | pitch = 62.10656197313677, slope = 0.7471526653616966
time = 0.06 | pitch = 62.12753432372088, slope = 0.7537853026725165
time = 0.08 | pitch = 62.14356085515463, slope = 0.6029402260685449
time = 0.10 | pitch = 62.15527887943904, slope = 0.35439225677224706
time = 0.12 | pitch = 62.15822635972506, slope = -0.09215632419989377
time = 0.14 | pitch = 62.15515933749697, slope = -0.4339500060326031
time = 0.16 | pitch = 62.14917226604109, slope = -0.18910020984406345
time = 0.18 | pitch = 62.145452352178566, slope = -0.2518757833236572
time = 0.20 | pitch = 62.1385699726754, slope = -0.478720054535811
time = 0.22 | pitch = 62.131805099302866, slope = -0.112322139450484
time = 0.24 | pitch = 62.12959715912933, slope = -0.2937070416612924
time = 0.26 | pitch = 62.11669143975957, slope = -1.0797595162603444
time = 0.28 | pitch = 62.1021907765591

  slope, intercept = np.linalg.lstsq(A, window)[0]
  slope, intercept = np.linalg.lstsq(A, window)[0]


In [None]:
# View the pitch estimates in the app
import sys
sys.path.append('..')

from app.ui.plots.PitchPlot import RunPitchPlot
from app.core.midi.MidiData import MidiData
from PyQt6.QtWidgets import QApplication
from PyQt6.QtCore import QCoreApplication


if __name__ == '__main__':
    if not QCoreApplication.instance():
        app = QApplication(sys.argv)
    else:
        app = QCoreApplication.instance()

    # ALIGNED_MIDI_FILEPATH = 'aligned.mid'
    # aligned_midi_data = MidiData(ALIGNED_MIDI_FILEPATH)

    # most_likely_pitches = [p[0] for p in pitches]
    # pitchplot = RunPitchPlot(
    #     app, midi_data=midi_data, pitches=most_likely_pitches, 
    # )
    pitchplot = RunPitchPlot(
        app, midi_data=midi_data, pitches=p, note_string=note_string
    )

Plotting pitches...
Done!


qt.pointer.dispatch: skipping QEventPoint(id=0 ts=0 pos=0,0 scn=1076.08,381.699 gbl=1076.08,381.699 Released ellipse=(1x1 ∡ 0) vel=0,0 press=-1076.08,-381.699 last=-1076.08,-381.699 Δ 1076.08,381.699) : no target window
qt.pointer.dispatch: skipping QEventPoint(id=1 ts=0 pos=0,0 scn=1174.18,423.818 gbl=1174.18,423.818 Released ellipse=(1x1 ∡ 0) vel=0,0 press=-1174.18,-423.818 last=-1174.18,-423.818 Δ 1174.18,423.818) : no target window
qt.pointer.dispatch: skipping QEventPoint(id=2 ts=0 pos=0,0 scn=1129,391.957 gbl=1129,391.957 Released ellipse=(1x1 ∡ 0) vel=0,0 press=-1129,-391.957 last=-1129,-391.957 Δ 1129,391.957) : no target window
qt.pointer.dispatch: skipping QEventPoint(id=3 ts=0 pos=0,0 scn=983.02,314.557 gbl=983.02,314.557 Released ellipse=(1x1 ∡ 0) vel=0,0 press=-983.02,-314.557 last=-983.02,-314.557 Δ 983.02,314.557) : no target window
qt.pointer.dispatch: skipping QEventPoint(id=4 ts=0 pos=0,0 scn=1038.82,368.558 gbl=1038.82,368.558 Released ellipse=(1x1 ∡ 0) vel=0,0 press

: 

## algorithm: string editing

string edit, keeping track of where each edit came from with a backpointer matrix, then retrace our steps at the end to get a final set of edits the user made wrt. the MIDI.

or in other words, returns a list of all the user's mistakes 😂

In [15]:
user_string = note_string
midi_string = MidiString(midi_data)

In [16]:
class Mistake:
    def __init__(self, type, user_note, midi_note):
        self.type = type
        self.user_note = user_note
        self.midi_note = midi_note

class Alignment:
    def __init__(self, notes: list[tuple[Note, Note]], mistakes: list[Mistake]):
        self.notes = notes
        self.mistakes = mistakes

    def merge_notes(i: int, j: int):
        """merges notes i with j (= i+1) from the list"""
        pass

    def insert_note(note, i):
        """inserts note into position i in notes"""
        pass

    def update_pitch(note, new_pitch):
        """updates the pitch of a note"""
        pass

def string_edit(user_string: UserString, midi_string: MidiString):
    """run string editing on the two user and midi strings.
    returns the alignment result
    """
    # setup dp matrix
    N = len(midi_string.notes)
    M = len(user_string.notes)

    mat = np.zeros([N+1, M+1], dtype=np.float64)
    backpointer = np.zeros([N+1, M+1], dtype=np.int64)

    # string edit costs
    INSERTION_COST = 1.5
    DELETION_COST = 2
    SUBSTITUTION_COST = 1
    TOLERANCE = 1

    # initialize first row / column
    mat[0, :] = np.cumsum([0]+[INSERTION_COST]*M) # all insertions
    mat[:, 0] = np.cumsum([0]+[DELETION_COST]*N) # all deletions

    for i in range(1, N+1):
        for j in range(1, M+1):

            top = mat[i-1, j]
            diag = mat[i-1, j-1] 
            left = mat[i, j-1]

            SUBSTITUTION_COST = 1
            note_distance = user_string.get_distance(midi_string, j-1, i-1)
            if abs(note_distance) < TOLERANCE: # being generous, the tolerance
                SUBSTITUTION_COST = 0 # same note pitch

            top_three = np.array([
                top + DELETION_COST,
                diag + SUBSTITUTION_COST,
                left + INSERTION_COST
            ])
            mat[i, j] = np.min(top_three)
            backpointer[i, j] = np.argmin(top_three)

    # traceback the backpointer
    i = N
    j = M

    mistakes = []
    notes = []
    while i>0 or j>0:

        mistake_type = backpointer[i, j]
        user_note = user_string.get_note(j-1) if j > 0 else None
        midi_note = midi_string.get_note(i-1) if i > 0 else None

        # deletion
        if mistake_type==0 and i>0:
            mistakes.append(
                Mistake(type="deletion", user_note=user_note, midi_note=midi_note)
            )
            notes.append((None, midi_note))
            i -= 1

        # substitution / no change
        if mistake_type==1 and i>0 and j>0:
            note_distance = user_string.get_distance(midi_string, j-1, i-1)
            if abs(note_distance) >= TOLERANCE:
                mistakes.append(
                    Mistake(type="substitution", user_note=user_note, midi_note=midi_note)
                )
            notes.append((user_note, midi_note))
            i -= 1
            j -= 1

        # insertion
        if mistake_type==2 and j>0:
           mistakes.append(
                Mistake(type="insertion", user_note=user_note, midi_note=midi_note)
            ) 
           j -= 1
           notes.append((user_note, None))

    notes = list(reversed(notes))
    mistakes = list(reversed(mistakes))
    return Alignment(notes, mistakes)

### test the string editing
run it on our recording example

In [17]:
alignment = string_edit(user_string, midi_string)

print("USER MISTAKES\n---")
for mistake in alignment.mistakes:
    print(f"time={mistake.user_note[0].start:.3f} | error={mistake.type}, user={mistake.user_note[0].pitch}, midi={mistake.midi_note.pitch}")

print("\nUSER ALIGNMENT\n---")
for user_note, midi_note in alignment.notes:
    if not user_note:
        print(f"deletion! midi={midi_note.pitch}")
    elif not midi_note:
        print(f"insertion! user={user_note[0].pitch}")
    else:
        print(f"user={user_note[0].pitch}, midi={midi_note.pitch}")

USER MISTAKES
---
time=0.467 | error=substitution, user=61.07803691740145, midi=69.0
time=9.163 | error=deletion, user=69.43439209414399, midi=70.0
time=12.434 | error=deletion, user=74.22826585376716, midi=79.0
time=13.044 | error=substitution, user=74.24309009619844, midi=78.0

USER ALIGNMENT
---
user=62.07711298332481, midi=62.0
user=61.07803691740145, midi=69.0
user=73.01330909055145, midi=73.0
user=75.98422931982638, midi=76.0
user=76.82162154857173, midi=77.0
user=81.14560417665929, midi=81.0
user=74.2066036746053, midi=74.0
user=72.29733784294304, midi=72.0
user=71.33828622354201, midi=71.0
user=74.16959117492235, midi=74.0
user=76.78747906180304, midi=77.0
user=80.08843791049944, midi=80.0
user=79.28312558690217, midi=79.0
user=77.11862231701149, midi=77.0
user=75.15636705977516, midi=75.0
user=74.28918736781232, midi=74.0
user=74.94527029505142, midi=75.0
user=67.0316863738491, midi=67.0
user=71.86753429253774, midi=72.0
user=73.84732459899436, midi=74.0
user=74.99961621936986

## generous-logic

for mistake in mistakes, we double check - are we sure?
1. if insertion
	- of octave multiple - be generous call it the same note
	- of different note - check for lower probability path through
2. if deletion
	- of same note - run onset detection and see if we detect anything
	- of different note - run onset detection and check for lower prob path
3. if substitution
	- check for lower prob path through

In [None]:
# inspect the alignment
print("\nUSER ALIGNMENT\n---")
for user_note, midi_note in alignment.notes:
    if not user_note:
        print(f"deletion! midi={midi_note.pitch}")
    elif not midi_note:
        print(f"insertion! user={user_note[0].pitch}")
    else:
        print(f"user={user_note[0].pitch}, midi={midi_note.pitch}")


USER ALIGNMENT
---
user=62.09741146720759, midi=62.0
user=61.07802387070343, midi=69.0
user=72.99405501006687, midi=73.0
user=75.97938887902814, midi=76.0
user=76.88053989245769, midi=77.0
user=81.49265387291221, midi=81.0
insertion! user=69.25755568427789
user=74.20704227727585, midi=74.0
user=72.23469529479229, midi=72.0
user=71.28734760508368, midi=71.0
user=74.19389828616086, midi=74.0
user=76.8334643712785, midi=77.0
user=80.09632292390567, midi=80.0
user=79.28022137422411, midi=79.0
user=77.0598023250059, midi=77.0
user=75.15187652412254, midi=75.0
user=74.24473771864378, midi=74.0
user=74.9518239874587, midi=75.0
insertion! user=67.03941059143096
user=70.81472410015658, midi=67.0
user=71.76196840766283, midi=72.0
user=73.91648257680443, midi=74.0
user=75.03549225028486, midi=75.0
user=79.034997095236, midi=79.0
user=72.1587042596085, midi=72.0
user=70.2502565695921, midi=70.0
user=69.16071956337322, midi=69.0
user=72.06738971480551, midi=72.0
user=75.04617228367904, midi=75.0
u

In [None]:
PITCH_TOLERANCE = 1
for mistake in alignment.mistakes:

    if mistake.type == "insertion":
        # harmonic checking
        diff = abs(mistake.user_note.pitch%12 - mistake.midi_note.pitch%12)
        if diff < PITCH_TOLERANCE:
            # merge the notes


    elif mistake.type == "deletion":
        if abs(mistake.user - mistake.midi) < PITCH_TOLERANCE:
            # run onset detection and see if anything new is inside there
            continue
        else:
            # is there a lower probability path through?
            for i in range(1, 2):
                
    elif mistake.type == "substitution":
        pass
