In [None]:
dc_sax = 'separated/htdemucs_6s/Detective Conan/other.mp3'
dc_vocals = 'separated/htdemucs_6s/Detective Conan/vocals.mp3'
dc_main = 'separated/htdemucs_6s/Detective Conan/main_theme_other.flac'

In [None]:
import essentia.standard as es
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import Audio

In [None]:
def extract_and_plot_pitch(file_path, title, vibrato=False, guess_unvoiced=False, voicing_tolerance=0.2):
    # Load audio
    loader = es.MonoLoader(filename=file_path)
    audio = loader()
    sample_rate = loader.paramValue('sampleRate')
    
    # Apply EqualLoudness preprocessing as recommended
    equalLoudness = es.EqualLoudness()
    audio_eq = equalLoudness(audio)
    
    # Extract predominant pitch
    pitch_extractor = es.PredominantPitchMelodia(voiceVibrato=vibrato, guessUnvoiced=guess_unvoiced, voicingTolerance=voicing_tolerance)
    predominant_pitch, pitch_confidence = pitch_extractor(audio_eq)

    # Pitch is estimated on frames. Compute frame time positions.
    pitch_times = np.linspace(0.0,len(audio)/sample_rate,len(predominant_pitch))
    
    # Filter out zero pitch values (unvoiced frames)
    valid_pitch = predominant_pitch[predominant_pitch > 0]
    valid_pitch_idx = np.where(predominant_pitch > 0)[0]
    pitch_times = pitch_times[valid_pitch_idx]

    # Plot the predominant pitch
    plt.figure(figsize=(8, 4))
    # plt.plot(pitch_times, valid_pitch)
    plt.plot(valid_pitch)
    plt.ylabel('Frequency (Hz)')
    plt.xlabel('Time (s)')
    plt.title(f'Predominant Pitch - {title}')
    plt.grid(True)
    
    print(f"--- {title} ---")
    print(f"Average pitch: {valid_pitch.mean():.2f} Hz")
    print(f"Min pitch: {valid_pitch.min():.2f} Hz")
    print(f"Max pitch: {valid_pitch.max():.2f} Hz")
    
    # Synthesize the predominant pitch as a sine wave
    if len(predominant_pitch) > 0:
        # Create time array (assuming 128 hop size at 44.1kHz - Essentia's default)
        hop_size = 128
        time = np.arange(0, len(predominant_pitch) * hop_size) / sample_rate
        
        # Generate sine wave with the extracted pitch
        synth_audio = np.zeros(len(time))
        phase = 0
        for i, freq in enumerate(predominant_pitch):
            if freq > 0:  # Only synthesize frames with valid pitch
                idx_start = i * hop_size
                idx_end = min((i + 1) * hop_size, len(time))
                t = time[idx_start:idx_end] - time[idx_start]
                
                # Generate continuous sine wave
                phase_increment = 2 * np.pi * freq / sample_rate
                frame_length = idx_end - idx_start
                phases = phase + np.arange(frame_length) * phase_increment
                synth_audio[idx_start:idx_end] = np.sin(phases)
                
                # Update phase for next frame to maintain continuity
                phase = phases[-1] + phase_increment
    else:
        synth_audio = np.zeros(1000)  # Empty audio if no pitch detected
    
    # Return the original audio, synthesized pitch audio, and sample rate
    return audio, synth_audio, valid_pitch, pitch_times, sample_rate

In [None]:

# Process each file
print("Extracting and plotting pitch for saxophone part...")
sax_audio, sax_pitch_synth, sax_pitch, sax_pitch_times, sr = extract_and_plot_pitch(dc_sax, "Saxophone")

print("\nExtracting and plotting pitch for vocals...")
vocals_audio, vocals_pitch_synth, vocals_pitch, vocals_pitch_times, sr = extract_and_plot_pitch(dc_vocals, "Vocals")

print("\nExtracting and plotting pitch for main theme...")
main_audio, main_pitch_synth, main_pitch, main_pitch_times, sr = extract_and_plot_pitch(dc_main, "Main Theme")

# Play the audio files
print("\nPlaying original audio files:")
print("Saxophone:")
display(Audio(data=sax_audio, rate=sr))
print("Vocals:")
display(Audio(data=vocals_audio, rate=sr))
print("Main Theme:")
display(Audio(data=main_audio, rate=sr))

# Play the synthesized pitch audio
print("\nPlaying synthesized pitch audio:")
print("Saxophone pitch:")
display(Audio(data=sax_pitch_synth, rate=sr))
print("Vocals pitch:")
display(Audio(data=vocals_pitch_synth, rate=sr))
print("Main Theme pitch:")
display(Audio(data=main_pitch_synth, rate=sr))

In [None]:
%matplotlib inline

import datetime as dt

import matplotlib.dates as dates
import pandas as pd
import stumpy
from matplotlib.patches import Rectangle

plt.style.use('https://raw.githubusercontent.com/TDAmeritrade/stumpy/main/docs/stumpy.mplstyle')

In [None]:
m = 640
fig, axs = plt.subplots(2)
plt.suptitle('Comparison', fontsize='30')
axs[0].set_ylabel("Main Theme", fontsize='20')
axs[0].plot(main_pitch, alpha=0.5, linewidth=1)
axs[1].set_xlabel("Time", fontsize='20')
axs[1].set_ylabel("Vocals", fontsize='20')
axs[1].plot(vocals_pitch, color='C1')
axs[1].plot(vocals_pitch, color='C2')
plt.show()

In [None]:
m = 2048

mp = stumpy.stump(main_pitch.astype(np.float64), m)

In [None]:
motif_idx = np.argsort(mp[:, 0])[0]
print(f"The motif is located at index {motif_idx}")

In [None]:
nearest_neighbor_idx = mp[motif_idx, 1]
print(f"The nearest neighbor is located at index {nearest_neighbor_idx}")


In [None]:
fig, axs = plt.subplots(2, sharex=True, gridspec_kw={'hspace': 0}, figsize=(12, 6))
plt.suptitle('Motif (Pattern) Discovery', fontsize='30')

# Convert indices to time values
motif_time = main_pitch_times[motif_idx]
neighbor_time = main_pitch_times[nearest_neighbor_idx]
window_duration = main_pitch_times[m-1] - main_pitch_times[0]  # Duration of the pattern in seconds

# Plot the main pitch with time on x-axis
axs[0].plot(main_pitch_times, main_pitch)
axs[0].set_ylabel('Main Pitch (Hz)', fontsize='20')

# Get the y-axis limits to set appropriate rectangle height
y_min, y_max = min(main_pitch), max(main_pitch)
height = y_max - y_min

# Add rectangles with appropriate time positioning
rect = Rectangle((motif_time, y_min), window_duration, height, facecolor='lightgrey', alpha=0.5)
axs[0].add_patch(rect)
rect = Rectangle((neighbor_time, y_min), window_duration, height, facecolor='lightgrey', alpha=0.5)
axs[0].add_patch(rect)

# Create time axis for the matrix profile
mp_times = main_pitch_times[:len(mp)]

# Plot the matrix profile with time on x-axis
axs[1].set_xlabel('Time (s)', fontsize='20')
axs[1].set_ylabel('Matrix Profile', fontsize='20')
axs[1].axvline(x=motif_time, linestyle="dashed")
axs[1].axvline(x=neighbor_time, linestyle="dashed")
axs[1].plot(mp_times, mp[:, 0])

plt.tight_layout()
plt.subplots_adjust(top=0.9)
plt.show()

In [None]:
mp[motif_idx, 0]

In [None]:
mp.P_, mp.I_

## Time Series Chains

In [None]:
%matplotlib inline

from scipy.io import loadmat
from matplotlib.patches import Rectangle, FancyArrowPatch
import itertools

In [None]:
all_chain_set, unanchored_chain = stumpy.allc(mp[:, 2], mp[:, 3])

In [None]:
plt.figure(figsize=(12, 6))
# Plot the main pitch with time on x-axis
plt.plot(main_pitch_times, main_pitch, linewidth=1, color='black')

# Plot the unanchored chains
for i in range(unanchored_chain.shape[0]):
    start_idx = unanchored_chain[i]
    end_idx = min(unanchored_chain[i] + m, len(main_pitch))
    
    # Get the corresponding time values and pitch segment
    y = main_pitch[start_idx:end_idx]
    x = main_pitch_times[start_idx:end_idx]
    
    plt.plot(x, y, linewidth=3)

# Add time markers at 10-second intervals
color = itertools.cycle(['white', 'gainsboro'])
min_time = min(main_pitch_times)
max_time = max(main_pitch_times)
pitch_min = min(main_pitch)
pitch_max = max(main_pitch)
height = pitch_max - pitch_min

# Create time markers at every 10 seconds
for i, t in enumerate(range(int(min_time), int(max_time), 10)):
    rect = Rectangle((t, pitch_min), 10, height, 
                    facecolor=next(color), alpha=0.3)
    plt.gca().add_patch(rect)

plt.title("Unanchored Chain in Main Theme Pitch Data", fontsize=20)
plt.xlabel("Time (s)", fontsize=15)
plt.ylabel("Frequency (Hz)", fontsize=15)
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(12, 6))
# Remove plt.axis('off') to show the axes

# Create a list to store segment start times for x-tick positions
segment_start_times = []
segment_labels = []

for i in range(unanchored_chain.shape[0]):
    # Get start and end indices for this segment
    start_idx = unanchored_chain[i]
    end_idx = min(start_idx + m, len(main_pitch))
    
    # Extract the pitch segment and corresponding time values
    data = main_pitch[start_idx:end_idx]
    time_values = main_pitch_times[start_idx:end_idx]
    
    # Calculate time offset for visual spacing between segments
    segment_duration = time_values[-1] - time_values[0]
    time_offset = i * (segment_duration + 2)  # 2 second gap between segments
    
    # Store the start position for x-ticks
    segment_start_times.append(time_offset)
    segment_labels.append(f"{time_values[0]:.1f}s")
    
    # Add vertical separators between segments
    if i > 0:
        plt.axvline(x=time_offset - 1, alpha=0.3)
        plt.axvline(x=time_offset - 0.5, alpha=0.3, linestyle='-.')
    
    # Plot the pitch segment with time-based x-axis
    plt.plot(time_values - time_values[0] + time_offset, data, linewidth=3)
    
    # Add segment label
    plt.text(time_offset, min(data), f"Original: {time_values[0]:.1f}s", 
             fontsize=10, verticalalignment='bottom')

# Set x-ticks at the start of each segment
plt.xticks(segment_start_times, segment_labels)

plt.grid(alpha=0.3)
plt.xlabel("Segment Start Time in Original Audio (s)", fontsize=15)
plt.ylabel("Frequency (Hz)", fontsize=15)
plt.title("Pitch Segments in Unanchored Chain", fontsize=20)
plt.show()

## Approximate Matrix Profile 

In [None]:
def compare_approximation(true_P, approx_P):
    fig, ax = plt.subplots(gridspec_kw={'hspace': 0})

    ax.set_xlabel('Time', fontsize ='20')
    # ax.axvline(x=643, linestyle="dashed")
    # ax.axvline(x=8724, linestyle="dashed")
    # ax.set_ylim((5, 28))
    ax.plot(approx_P, color='C1', label="Approximate Matrix Profile")
    ax.plot(true_P, label="True Matrix Profile")
    ax.legend()
    plt.show()

seed = np.random.randint(100000)
np.random.seed(seed)
approx = stumpy.scrump(main_pitch.astype(np.float64), m, percentage=0.01, pre_scrump=False)

approx.update()
approx_P = approx.P_

compare_approximation(mp[:, 0], approx_P)

In [None]:
for _ in range(9):
    approx.update()
    
approx_P = approx.P_

In [None]:
compare_approximation(mp[:, 0], approx_P)

In [None]:
approx = stumpy.scrump(main_pitch.astype(np.float64), m, percentage=0.01, pre_scrump=True, s=None)
approx.update()
approx_P = approx.P_

In [None]:
compare_approximation(mp[:, 0], approx_P)

## Pre-defined Pattern Matching
https://stumpy.readthedocs.io/en/latest/Tutorial_Pattern_Matching.html

In [None]:
import numpy.testing as npt
import os
import pandas as pd

In [None]:
corpus = '../Star-Wars-Thematic-Corpus/'
# List all files in the corpus directory
files = os.listdir(corpus)

# Print the files
print("Files in corpus:")
for file in files:
    print(f"- {file}")

# Optionally, print full paths
print("\nFull paths:")
for file in files:
    print(f"- {os.path.join(corpus, file)}")
motif_1a = '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/1a_Main_Theme_Basic_(A_Section).musicxml'
motif_1b = '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/1b_Main_Theme_Basic_(B_Section).musicxml'
motif_2 = '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/2_Rebel_Fanfare_Basic.musicxml'
motif_3 = "/Users/sid/Developer/UPF/SMC/Motif Discovery/Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/3_Force_Theme_Basic.musicxml"
motif_4 = '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Krns/4_Leias_Theme_Basic.krn'
motif_5 = '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/5_Death_Star_Basic.musicxml'
motif_10a = "/Users/sid/Developer/UPF/SMC/Motif Discovery/Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/10_Imperial_March_Basic.musicxml"
audio_file = '/Users/sid/Developer/UPF/SMC/Motif Discovery/source-separation/separated/htdemucs_ft/46. John Williams & London Symphony Orchestra - Episode IV - Main Title/other.flac'

In [None]:
import crepe
import librosa
import numpy as np
from IPython.display import Audio

import matplotlib.pyplot as plt

def extract_melody_with_crepe(file_path, title, sr=16000, model_capacity="full", viterbi=True, confidence_threshold=0.0, constrain_octave=False):
    """
    Extract melody from audio using CREPE pitch tracking algorithm
    
    Parameters:
    -----------
    file_path : str
        Path to audio file
    title : str
        Title for plots and output
    sr : int
        Sample rate for CREPE (default 16000 Hz)
    model_capacity : str
        CREPE model capacity ('tiny', 'small', 'medium', 'large', or 'full')
    viterbi : bool
        Whether to use Viterbi decoding for pitch tracking
    confidence_threshold : float
        Threshold for confidence values (pitches with lower confidence will be set to 0)
    constrain_octave : bool
        Whether to constrain the pitch to a single octave
        
    Returns:
    --------
    audio : np.ndarray
        Original audio samples
    synth_audio : np.ndarray
        Synthesized audio from pitch
    pitch : np.ndarray
        Extracted pitch values in Hz
    pitch_times : np.ndarray
        Time positions for each pitch value
    confidence : np.ndarray
        Confidence values for each pitch estimate
    original_sr : int
        Original sample rate of audio file
    """
    # Load audio
    print(f"Loading audio from {file_path}...")
    audio, original_sr = librosa.load(file_path, sr=None, mono=True)
    
    # Resample for CREPE if needed
    if original_sr != sr:
        print(f"Resampling from {original_sr}Hz to {sr}Hz for CREPE...")
        audio_resampled = librosa.resample(audio, orig_sr=original_sr, target_sr=sr)
    else:
        audio_resampled = audio
    
    # Extract pitch with CREPE
    print(f"Extracting pitch using CREPE ({model_capacity} model)...")
    pitch_times, pitch, confidence, _ = crepe.predict(
        audio_resampled, 
        sr, 
        model_capacity=model_capacity, 
        viterbi=viterbi,
        verbose=1
    )
    
    # Filter out low confidence regions
    filtered_pitch = np.copy(pitch)
    filtered_pitch[confidence < confidence_threshold] = 0
    
    # Create valid pitch mask for plotting and statistics
    valid_mask = filtered_pitch > 0
    if np.any(valid_mask):
        valid_pitch = filtered_pitch[valid_mask]
        
        # Constrain to a single octave if requested
        if constrain_octave and len(valid_pitch) > 0:
            print("Constraining pitch to a single octave...")
            
            # Convert to MIDI note numbers (log scale)
            midi_notes = 12 * np.log2(valid_pitch / 440) + 69
            
            # Find the most common octave
            octaves = np.floor(midi_notes / 12)
            unique_octaves, counts = np.unique(octaves, return_counts=True)
            predominant_octave = unique_octaves[np.argmax(counts)]
            
            # Calculate octave range
            octave_min = predominant_octave * 12
            octave_max = octave_min + 12
            
            # Function to constrain a note to the predominant octave
            def constrain_to_octave(note):
                while note < octave_min:
                    note += 12
                while note >= octave_max:
                    note -= 12
                return note
            
            # Apply constraint to all notes
            constrained_midi = np.array([constrain_to_octave(note) for note in midi_notes])
            
            # Convert back to Hz
            constrained_pitch = 440 * 2**((constrained_midi - 69) / 12)
            
            # Replace the valid pitch values with constrained ones
            filtered_pitch[valid_mask] = constrained_pitch
            valid_pitch = filtered_pitch[valid_mask]
            
            print(f"Predominant octave: {int(predominant_octave)}")
            print(f"Pitch range constrained to: {440 * 2**((octave_min - 69) / 12):.2f}Hz - {440 * 2**((octave_max - 69) / 12):.2f}Hz")
        
        # Statistics
        print(f"--- {title} ---")
        print(f"Average pitch: {np.mean(valid_pitch):.2f} Hz")
        print(f"Min pitch: {np.min(valid_pitch):.2f} Hz")
        print(f"Max pitch: {np.max(valid_pitch):.2f} Hz")
        print(f"Confidence threshold: {confidence_threshold}")
        print(f"Percentage of frames above threshold: {np.mean(valid_mask) * 100:.2f}%")
        
        # Plot the pitch contour
        plt.figure(figsize=(12, 8))
        
        # Plot the pitch with confidence as color intensity
        plt.subplot(2, 1, 1)
        plt.scatter(pitch_times, pitch, c=confidence, cmap='viridis', 
                   alpha=0.75, s=5, label='Raw pitch')
        plt.plot(pitch_times[valid_mask], valid_pitch, 'r-', linewidth=1, label='Filtered pitch')
        plt.colorbar(label='Confidence')
        plt.ylabel('Frequency (Hz)')
        plt.title(f'Pitch Contour - {title}')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # Plot the confidence
        plt.subplot(2, 1, 2)
        plt.plot(pitch_times, confidence)
        plt.axhline(y=confidence_threshold, color='r', linestyle='--', 
                   label=f'Threshold ({confidence_threshold})')
        plt.ylabel('Confidence')
        plt.xlabel('Time (s)')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # Synthesize audio from pitch
        print("Synthesizing audio from extracted pitch...")
        synth_audio = np.zeros_like(audio)
        
        # Only synthesize for frames with valid pitch
        pitch_frames = np.round(pitch_times * original_sr).astype(int)
        phase = 0
        
        for i, idx in enumerate(pitch_frames):
            if idx >= len(synth_audio) or not valid_mask[i]:
                continue
                
            freq = filtered_pitch[i]
            if freq > 0:
                # Calculate samples until next pitch value or end of audio
                next_idx = pitch_frames[i+1] if i+1 < len(pitch_frames) else len(synth_audio)
                samples = next_idx - idx
                if samples <= 0:
                    continue
                    
                # Generate continuous sine wave with proper phase
                t = np.arange(samples) / original_sr
                phase_increment = 2 * np.pi * freq / original_sr
                phases = phase + np.arange(samples) * phase_increment
                synth_audio[idx:next_idx] = np.sin(phases)
                
                # Update phase for continuity
                phase = phases[-1] + phase_increment
    else:
        print(f"No valid pitch detected in {title}")
        valid_pitch = np.array([])
        
    return audio, synth_audio, filtered_pitch, pitch_times, confidence, original_sr

In [None]:
# Extract melody from the audio file using CREPE
main_theme_audio, main_theme_pitch_synth, main_theme_pitch, main_theme_pitch_times, main_theme_confidence, main_theme_sr = extract_melody_with_crepe(audio_file, "Star Wars - Main Theme")

# Play the original and synthesized audio
print("Playing original main_theme March audio:")
display(Audio(data=main_theme_audio, rate=main_theme_sr))

print("Playing synthesized pitch audio:")
display(Audio(data=main_theme_pitch_synth, rate=main_theme_sr))

In [None]:

# audio_file = "/Users/sid/Downloads/Torrents/Music/John Williams & London Symphony Orchestra - Star Wars - The Ultimate Digital Collection (2016 - Soundtracks) [Flac 24-44_192]/22. John Williams, London Symphony Orchestra & London Voices - Episode II - Departing Coruscant.flac"

# Process each file
print("Extracting and plotting pitch for Main theme part...")
sw_audio, sw_pitch_synth, sw_pitch, sw_pitch_times, sr = extract_and_plot_pitch(audio_file, "Main Theme")

# Estimate the tempo from the audio
rhythm_extractor = es.RhythmExtractor2013(method="multifeature")
sw_bpm, sw_beats, sw_beats_confidence, _, sw_beats_intervals = rhythm_extractor(sw_audio)

print(f"Estimated BPM: {sw_bpm}")

# Play the audio files
print("\nPlaying original audio files:")
print("Main Theme:")
display(Audio(data=sw_audio, rate=sr))

# Play the synthesized pitch audio
print("\nPlaying synthesized pitch audio:")
print("Main Theme pitch:")
display(Audio(data=sw_pitch_synth, rate=sr))

In [None]:
from music21 import chord, converter, harmony, note

In [None]:
import numpy as np
import pandas as pd
from music21 import converter, note, chord, harmony
import stumpy
from IPython.display import display

def extract_midi_notes(score):
    """Extract MIDI notes from a music21 score"""
    notes = []
    for element in score.flatten().notes:
        if isinstance(element, note.Note):
            notes.append(element.pitch.midi)
        elif isinstance(element, chord.Chord):
            for pitch in element.pitches:
                notes.append(pitch.midi)
        elif isinstance(element, harmony.ChordSymbol):
            continue
    return notes

def extract_notes_with_duration(score):
    """Extract notes with duration and offset information"""
    notes_with_duration = []
    for element in score.flatten().notes:
        if isinstance(element, note.Note):
            notes_with_duration.append({
                'pitch': element.pitch.midi,
                'duration': element.duration.quarterLength,
                'offset': element.offset
            })
    return notes_with_duration

def plot_pitch_contour(notes_df, title="Pitch Contour"):
    """Plot the pitch contour for visualization"""
    plt.figure(figsize=(12, 6))
    plt.plot(notes_df['offset'], notes_df['pitch'], 'o-')
    plt.xlabel('Offset (quarter notes)')
    plt.ylabel('MIDI Pitch')
    plt.title(title)
    plt.grid(True)
    plt.show()

def midi_to_hz(midi_pitch):
    """Convert MIDI pitch to Hz (frequency)"""
    return 440 * (2**((midi_pitch - 69) / 12))

def prepare_motif_for_matching(notes_df, estimated_tempo, audio_pitch_times):
    """Prepare the motif for pattern matching with stumpy"""
    quarter_note_duration = 60 / estimated_tempo  # in seconds
    motif_times = []
    motif_hz_resampled = []
    current_time = 0
    
    for i, row in notes_df.iterrows():
        duration_seconds = row['duration'] * quarter_note_duration
        num_samples = int(duration_seconds * (len(audio_pitch_times) / (audio_pitch_times[-1] - audio_pitch_times[0])))
        if num_samples > 0:
            times = np.linspace(current_time, current_time + duration_seconds, num_samples)
            motif_times.extend(times)
            motif_hz_resampled.extend([midi_to_hz(row['pitch'])] * num_samples)
        current_time += duration_seconds
    
    return np.array(motif_hz_resampled, dtype=np.float64), np.array(motif_times)

def find_motif_match(motif_hz_resampled, audio_pitch):
    """Find the motif in audio using stumpy"""
    audio_pitch_array = np.array(audio_pitch, dtype=np.float64)
    distance_profile = stumpy.mass(motif_hz_resampled, audio_pitch_array)
    match_idx = np.argmin(distance_profile)
    min_distance = np.min(distance_profile)
    
    return match_idx, min_distance, distance_profile

def plot_motif_match(audio_pitch_times, audio_pitch, match_idx, motif_hz_resampled, min_distance):
    """Plot the match between the motif and the audio pitch"""
    plt.figure(figsize=(12, 6))
    plt.plot(audio_pitch_times, audio_pitch, label='Audio Pitch')
    
    end_idx = min(match_idx + len(motif_hz_resampled), len(audio_pitch))
    match_times = audio_pitch_times[match_idx:end_idx]
    match_pitch = audio_pitch[match_idx:end_idx]
    
    plt.plot(match_times, match_pitch, linewidth=3, label='Match')
    
    start_time = audio_pitch_times[match_idx]
    end_time = audio_pitch_times[end_idx-1] if end_idx < len(audio_pitch_times) else audio_pitch_times[-1]
    plt.axvline(start_time, color='r', linestyle='--', alpha=0.5)
    plt.axvline(end_time, color='r', linestyle='--', alpha=0.5)
    plt.text(start_time, max(audio_pitch)*0.9, f'Start: {start_time:.2f}s', rotation=90, verticalalignment='top')
    plt.text(end_time, max(audio_pitch)*0.9, f'End: {end_time:.2f}s', rotation=90, verticalalignment='top')
    
    plt.title(f'Motif Match in Audio (Match distance: {min_distance:.2f})')
    plt.xlabel('Time (s)')
    plt.ylabel('Frequency (Hz)')
    plt.legend()
    plt.show()
    
    return start_time, end_time

def process_motif_file(file_path, audio_pitch, audio_pitch_times, tempo):
    """Process a motif file and find its match in the audio"""
    # Parse score
    score = converter.parse(file_path)
    
    # Extract notes
    notes = extract_midi_notes(score)
    print(f"Extracted {len(notes)} MIDI notes, first 10: {notes[:10] if notes else []}")
    
    # Extract notes with duration
    notes_with_duration = extract_notes_with_duration(score)
    notes_df = pd.DataFrame(notes_with_duration)
    
    # Plot pitch contour
    title = f"Pitch Contour of {file_path.split('/')[-1]}"
    plot_pitch_contour(notes_df, title)
    
    # Prepare motif for matching
    motif = notes_df.sort_values('offset')[['pitch']]
    print(f"Prepared time series with {len(motif)} notes for pattern matching")
    
    # Convert to Hz and resample
    motif_hz_resampled, motif_times = prepare_motif_for_matching(
        notes_df, tempo, audio_pitch_times
    )
    
    # Find match
    match_idx, min_distance, distance_profile = find_motif_match(
        motif_hz_resampled, audio_pitch
    )
    
    # Plot match
    start_time, end_time = plot_motif_match(
        audio_pitch_times, audio_pitch, match_idx, motif_hz_resampled, min_distance
    )
    
    return {
        'match_idx': match_idx,
        'distance': min_distance,
        'start_time': start_time,
        'end_time': end_time,
        'score': score,
        'notes_df': notes_df,
        'motif_hz': motif_hz_resampled
    }

# Example usage
import matplotlib.pyplot as plt

# Import required libraries if not already imported

# Define the motif files to process
motif_files = [
    '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/1a_Main_Theme_Basic_(A_Section).musicxml',
    '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/1b_Main_Theme_Basic_(B_Section).musicxml',
    '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/2_Rebel_Fanfare_Basic.musicxml',
    # '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Krns/4_Leias_Theme_Basic.krn',
    '../Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/5_Death_Star_Basic.musicxml'
    ]

# Use Star Wars pitch data and tempo for matching
audio_pitch = sw_pitch
audio_pitch_times = sw_pitch_times
estimated_tempo = 60 / quarter_note_duration  # Convert quarter note duration to BPM

# Process each motif file
results = {}
for motif_file in motif_files:
    print(f"\nProcessing motif: {motif_file.split('/')[-1]}")
    motif_name = motif_file.split('/')[-1].split('.')[0]
    results[motif_name] = process_motif_file(motif_file, audio_pitch, audio_pitch_times, estimated_tempo)

# Summarize the results
print("\n\nSummary of Results:")
print("=" * 80)
for motif_name, result in results.items():
    print(f"Motif: {motif_name}")
    print(f"  Match found at {result['start_time']:.2f}s to {result['end_time']:.2f}s")
    print(f"  Match distance: {result['distance']:.2f}")
    print("-" * 80)


In [None]:
# import converter21
# converter21.register()

# sample_file = '/Users/sid/Developer/UPF/SMC/Motif Discovery/Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Krns/4_Leias_Theme_Basic.krn'
sample_file = '/Users/sid/Developer/UPF/SMC/Motif Discovery/Star-Wars-Thematic-Corpus/OriginalTriologyThemeFiles/Xmls/3_Force_Theme_Basic.musicxml'
score = converter.parse(sample_file)

# Initialize an empty list for MIDI notes
notes = []

# Iterate through elements in the score
for element in score.flatten().notes:
    # Handle different types of elements
    if isinstance(element, note.Note):
        # Regular note with pitch
        notes.append(element.pitch.midi)
    elif isinstance(element, chord.Chord):
        # Chord with multiple pitches
        for pitch in element.pitches:
            notes.append(pitch.midi)
    elif isinstance(element, harmony.ChordSymbol):
        # Skip ChordSymbol objects
        continue

# Print the first few notes to check
print(f"Extracted {len(notes)} MIDI notes, first 10: {notes[:10] if notes else []}")
# We need to re-process the score to include duration information
notes_with_duration = []

for element in score.flatten().notes:
    if isinstance(element, note.Note):
        # Store pitch and duration as tuple
        notes_with_duration.append({
            'pitch': element.pitch.midi,
            'duration': element.duration.quarterLength,
            'offset': element.offset
        })
    # Skip chords completely - don't process chord pitches

# Convert to dataframe for easier manipulation
notes_df = pd.DataFrame(notes_with_duration)

# Plot the pitch contour
plt.figure(figsize=(12, 6))
plt.plot(notes_df['offset'], notes_df['pitch'], 'o-')
plt.xlabel('Offset (quarter notes)')
plt.ylabel('MIDI Pitch')
plt.title('Pitch Contour of the Main Theme')
plt.grid(True)

# Prepare the time series for pattern matching with stumpy
# First, ensure we have pitch values in chronological order
motif = notes_df.sort_values('offset')[['pitch']]

print(f"Prepared time series with {len(motif)} notes for pattern matching")

# Convert MIDI pitches from the score to Hz (frequency)
def midi_to_hz(midi_pitch):
    return 440 * (2**((midi_pitch - 69) / 12))

# Convert the motif notes to Hz
motif_hz = motif['pitch'].apply(midi_to_hz).values

# Resample the motif to match the time scale of the audio
# The timing needs to be adjusted based on tempo
estimated_tempo = sw_bpm  # BPM - adjust based on your audio
quarter_note_duration = 60 / estimated_tempo  # in seconds

# Create a time series from the motif with appropriate timing
motif_times = []
motif_hz_resampled = []
current_time = 0

for i, row in notes_df.iterrows():
    # Calculate duration in seconds
    duration_seconds = row['duration'] * quarter_note_duration
    # Create samples for this note's duration at a similar sample rate to your audio pitch
    num_samples = int(duration_seconds * (len(main_pitch_times) / (main_pitch_times[-1] - main_pitch_times[0])))
    if num_samples > 0:
        times = np.linspace(current_time, current_time + duration_seconds, num_samples)
        motif_times.extend(times)
        motif_hz_resampled.extend([midi_to_hz(row['pitch'])] * num_samples)
    current_time += duration_seconds

# Now use stumpy to find the motif in the audio pitch contour
# Convert both arrays to float64 dtype as required by stumpy

motif_hz_resampled_array = np.array(motif_hz_resampled, dtype=np.float64)
sw_pitch_array = np.array(sw_pitch, dtype=np.float64)

distance_profile = stumpy.mass(motif_hz_resampled_array, 
                              sw_pitch_array)
match_idx = np.argmin(distance_profile)
# Plot the match
plt.figure(figsize=(12, 6))
plt.plot(sw_pitch_times, sw_pitch, label='Audio Pitch')

# Calculate the end index for plotting, ensuring we don't go out of bounds
end_idx = min(match_idx + len(motif_hz_resampled), len(sw_pitch))
match_times = sw_pitch_times[match_idx:end_idx]
match_pitch = sw_pitch[match_idx:end_idx]

# Plot the match with thicker line
plt.plot(match_times, match_pitch, linewidth=3, label='Match')

# Label the start and end timestamps
start_time = sw_pitch_times[match_idx]
end_time = sw_pitch_times[end_idx-1] if end_idx < len(sw_pitch_times) else sw_pitch_times[-1]
plt.axvline(start_time, color='r', linestyle='--', alpha=0.5)
plt.axvline(end_time, color='r', linestyle='--', alpha=0.5)
plt.text(start_time, max(sw_pitch)*0.9, f'Start: {start_time:.2f}s', rotation=90, verticalalignment='top')
plt.text(end_time, max(sw_pitch)*0.9, f'End: {end_time:.2f}s', rotation=90, verticalalignment='top')

# Display match quality
min_distance = np.min(distance_profile)
plt.title(f'Motif Match in Audio (Match distance: {min_distance:.2f})')
plt.xlabel('Time (s)')
plt.ylabel('Frequency (Hz)')
plt.legend()
plt.show()


In [None]:
# Find motif matches with exclusion zones to prevent clustering
k = 20
exclusion_zone = len(motif_hz_resampled_array) // 2  # Half the motif length as exclusion zone

# Initialize arrays to store matches
match_indices = []
match_distances = []

# Make a copy of the distance profile that we can modify
working_distances = distance_profile.copy()

# Find k matches with exclusion zones
for i in range(k):
    # Find the best match in the current working distances
    idx = np.argmin(working_distances)
    match_indices.append(idx)
    match_distances.append(distance_profile[idx])
    
    # Apply exclusion zone around this match
    exclusion_start = max(0, idx - exclusion_zone)
    exclusion_end = min(len(working_distances), idx + exclusion_zone)
    working_distances[exclusion_start:exclusion_end] = np.inf

# Calculate the length of the motif
motif_length = len(motif_hz_resampled_array)

# Set up the plot
plt.figure(figsize=(12, 6))

# Plot the Star Wars pitch contour
plt.plot(sw_pitch_times, sw_pitch, linewidth=1, color='black', alpha=0.5)
plt.title('Star Wars Main Theme - Motif Matches', fontsize=20)
plt.xlabel('Time (s)', fontsize=15)
plt.ylabel('Frequency (Hz)', fontsize=15)

# Plot all the motif matches with a thicker line and different colors
for i, idx in enumerate(match_indices):
    end_idx = min(idx + motif_length, len(sw_pitch))
    
    # Get start and end times for this match
    start_time = sw_pitch_times[idx]
    end_time = sw_pitch_times[end_idx-1] if end_idx < len(sw_pitch_times) else sw_pitch_times[-1]
    
    plt.plot(sw_pitch_times[idx:end_idx], sw_pitch[idx:end_idx], linewidth=2, 
             label=f'Match {i+1}: {start_time:.2f}s')
    
    # Add a marker at the start of each match
    plt.plot(sw_pitch_times[idx], sw_pitch[idx], 'o', markersize=5)
    
    # Add vertical lines with timestamps for top 5 matches
    if i < 5:
        plt.axvline(x=start_time, color=f'C{i+1}', linestyle='--', alpha=0.5)
        plt.axvline(x=end_time, color=f'C{i+1}', linestyle='--', alpha=0.5)
        
        # Add timestamp labels
        y_offset = pitch_min + (i * height / 10)  # Stagger labels vertically
        plt.text(start_time, y_offset, f'{start_time:.2f}s', 
                 rotation=90, fontsize=8, va='bottom', ha='right')
        plt.text(end_time, y_offset, f'{end_time:.2f}s', 
                 rotation=90, fontsize=8, va='bottom', ha='left')

# Add a legend for the first 5 matches to avoid cluttering
plt.legend(loc='upper right', fontsize=8, ncol=2, bbox_to_anchor=(1.15, 1))

# Add time markers at 10-second intervals
color = itertools.cycle(['white', 'gainsboro'])
min_time = min(sw_pitch_times)
max_time = max(sw_pitch_times)
pitch_min = min(sw_pitch)
pitch_max = max(sw_pitch)
height = pitch_max - pitch_min

# Create time markers at every 10 seconds
for i, t in enumerate(range(int(min_time), int(max_time), 10)):
    rect = Rectangle((t, pitch_min), 10, height, 
                   facecolor=next(color), alpha=0.3)
    plt.gca().add_patch(rect)

# Print distances of top 5 matches with start and end times
print("Top 5 matches:")
for i in range(min(5, len(match_indices))):
    idx = match_indices[i]
    start_time = sw_pitch_times[idx]
    end_idx = min(idx + motif_length, len(sw_pitch))
    end_time = sw_pitch_times[end_idx-1] if end_idx < len(sw_pitch_times) else sw_pitch_times[-1]
    duration = end_time - start_time
    print(f"Match {i+1}: {start_time:.2f}s to {end_time:.2f}s (duration: {duration:.2f}s) with distance {match_distances[i]:.4f}")

plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


In [None]:
from dtaidistance import dtw
from dtaidistance import dtw_visualisation as dtwvis
import matplotlib.pyplot as plt
import numpy as np
from scipy.spatial.distance import euclidean

# Define m - length of the pattern to consider (adjust as needed)
m = 2048  # Using a reasonable default based on your data
# Define the motif index using the predefined sw_motif_index 
motif_idx = sw_motif_index  # Using the defined motif index instead of match_indices[0]
end_idx_motif = min(motif_idx + m, len(sw_pitch))
motif_segment = sw_pitch[motif_idx:end_idx_motif]

# Create a list to store DTW results for all matches
dtw_results = []

# Convert to time domain segments for visualization
motif_times = sw_pitch_times[motif_idx:end_idx_motif]
motif_time_adjusted = motif_times - motif_times[0]  # Start at 0

# Process each match
for i, idx in enumerate(match_indices[:10]):  # Process top 10 matches
    end_idx = min(idx + m, len(sw_pitch))
    match_segment = sw_pitch[idx:end_idx]
    match_times = sw_pitch_times[idx:end_idx]
    
    # Make sure we have data before adjusting
    if len(match_times) > 0:
        match_time_adjusted = match_times - match_times[0]  # Start at 0
        
        # Calculate DTW between motif and this match
        dtw_distance = dtw.distance(motif_segment, match_segment)
        
        # Calculate warping path
        path = dtw.warping_path(motif_segment, match_segment)
        
        # Store results
        dtw_results.append({
            'index': idx,
            'distance': dtw_distance,
            'segment': match_segment,
            'times': match_times,
            'path': path,
            'start_time': match_times[0]
        })

# Sort matches by DTW distance (best to worst)
if dtw_results:  # Check if we have results
    dtw_results.sort(key=lambda x: x['distance'])

    # Plot top 5 matches with time on x-axis
    fig, axs = plt.subplots(6, 1, figsize=(14, 16))

    # Plot the motif first
    axs[0].plot(motif_time_adjusted, motif_segment)
    axs[0].set_title('Original Motif', fontsize=14)
    axs[0].set_ylabel('Frequency (Hz)')
    axs[0].grid(True, alpha=0.3)

    # Plot top 5 matches
    for i in range(min(5, len(dtw_results))):
        result = dtw_results[i]
        match_times = result['times'] - result['times'][0]  # Adjust to start at 0
        
        axs[i+1].plot(match_times, result['segment'], color=f'C{i+1}')
        axs[i+1].set_title(f'Match {i+1} - Start time: {result["start_time"]:.2f}s, DTW distance: {result["distance"]:.2f}', 
                          fontsize=12)
        axs[i+1].set_ylabel('Frequency (Hz)')
        axs[i+1].grid(True, alpha=0.3)
        
        if i == 4:  # For the last subplot
            axs[i+1].set_xlabel('Time (s)')

    plt.tight_layout()
    plt.show()

    # Create a distance matrix visualization for the best match
    best_match = dtw_results[0]
    plt.figure(figsize=(12, 8))

    # Create distance matrix
    distance_matrix = np.zeros((len(motif_segment), len(best_match['segment'])))
    for i in range(len(motif_segment)):
        for j in range(len(best_match['segment'])):
            distance_matrix[i, j] = abs(motif_segment[i] - best_match['segment'][j])

    # Plot the distance matrix and warping path
    plt.imshow(distance_matrix, origin='lower', cmap='viridis', aspect='auto')
    plt.plot([p[1] for p in best_match['path']], [p[0] for p in best_match['path']], 'r-')
    plt.title(f'DTW Warping Path - Best Match (Start time: {best_match["start_time"]:.2f}s)')
    plt.xlabel('Match Segment Frame')
    plt.ylabel('Motif Segment Frame')
    plt.colorbar(label='Distance')
    plt.tight_layout()
    plt.show()

    # Print summary of all DTW distances
    print("\nDTW Distance Ranking:")
    for i, result in enumerate(dtw_results):
        print(f"Rank {i+1}: Match at {result['start_time']:.2f}s - Distance: {result['distance']:.2f}")
else:
    print("No valid matches were found for DTW analysis")

In [None]:
from stumpy import core
import stumpy

In [None]:
m = 4096
ab_mp = stumpy.stump(T_A = motif_hz_resampled_array, 
                        m = m,
                        T_B = sw_pitch_array,
                        ignore_trivial = False)

In [None]:
ab_mp

In [None]:
motif_index = ab_mp[:, 0].argmin()

plt.xlabel('Subsequence')
plt.ylabel('Matrix Profile')

plt.scatter(motif_index, 
               ab_mp[motif_index, 0],
               c='red',
               s=100)

plt.plot(ab_mp[:,0])

plt.show()

In [None]:
print(f'The motif is located at index {motif_index} of "SW Main Theme (MusicXML)"')

In [None]:
sw_motif_index = ab_mp[motif_index, 1]
print(f'The motif is located at index {sw_motif_index} of "SW Main Theme"')

In [None]:
# Get the time values for the SW pitch motif
sw_motif_times = sw_pitch_times[sw_motif_index:sw_motif_index+m]
# Adjust time to start from 0 for better comparison
sw_motif_times = sw_motif_times - sw_motif_times[0]

# Create time array for the MusicXML motif with same duration as SW motif
motif_duration = sw_motif_times[-1]  # Duration in seconds
musicxml_times = np.linspace(0, motif_duration, len(z_norm_motif))

# Z-normalize the pitch arrays
z_norm_motif = core.z_norm(motif_hz_resampled_array[motif_index : motif_index + m])
sw_z_norm_motif = core.z_norm(sw_pitch_array[sw_motif_index:sw_motif_index+m])

# Plot using time on x-axis
plt.figure(figsize=(12, 6))
plt.plot(musicxml_times, z_norm_motif, label='SW Main Theme (MusicXML)')
plt.plot(sw_motif_times, sw_z_norm_motif, label='SW Main Theme')

plt.xlabel('Time (seconds)')
plt.ylabel('Z-normalized Frequency')
plt.title('Comparison of Matching Motifs')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()


### Stumpy.match

In [None]:
matches = stumpy.match(motif_hz_resampled_array, sw_pitch_array)

In [None]:
# Extract match information
# In stumpy.match output, the first column is the index and the second column is the distance
match_indices = matches[:, 0].astype(int)
match_distances = matches[:, 1]

# Create a dataframe to display results with timestamps
results_df = pd.DataFrame({
    'Index': match_indices,
    'Time (s)': sw_pitch_times[match_indices],
    'Distance': match_distances
})

# Sort by distance (best matches first)
results_df = results_df.sort_values('Distance')

# Display top 10 matches with timestamps
print(f"Top 10 matches of the motif in the audio:")
display(results_df.head(10))

# Plot top 5 matches
plt.figure(figsize=(12, 6))
plt.plot(sw_pitch_times, sw_pitch, linewidth=1, color='gray', alpha=0.5)

# Determine motif length for highlighting matches
motif_length = len(motif_hz_resampled_array)

# Plot top 5 matches with different colors
for i, idx in enumerate(results_df.head(5)['Index']):
    end_idx = min(idx + motif_length, len(sw_pitch))
    match_times = sw_pitch_times[idx:end_idx]
    match_pitch = sw_pitch[idx:end_idx]
    
    plt.plot(match_times, match_pitch, linewidth=2, 
             label=f'Match {i+1}: {sw_pitch_times[idx]:.2f}s')

plt.title('Top 5 Matches of Motif in Audio', fontsize=15)
plt.xlabel('Time (s)', fontsize=12)
plt.ylabel('Frequency (Hz)', fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# DTW

In [None]:
from dtwParallel import dtw_functions
from scipy.spatial import distance as d

# For Univariate Time Series
x = motif_hz_resampled_array
y = sw_pitch_array

dtw_functions.dtw(x,y,local_dissimilarity=d.euclidean, n_threads=10, get_visualization=True)

In [None]:
x = motif_hz_resampled_array
y = sw_pitch_array

In [None]:
from dtaidistance import dtw
from dtaidistance import dtw_visualisation as dtwvis

path = dtw.warping_path(x, y)
dtwvis.plot_warping(x, y, path, filename="warp.png")

In [None]:
from dtwParallel import dtw_functions
from scipy.spatial import distance as d
import matplotlib.pyplot as plt

# Get top 5 matches for DTW comparison
top_n = 2
dtw_results = []

plt.figure(figsize=(15, 10))

# Loop through top matches
for i in range(min(top_n, len(match_indices))):
    idx = match_indices[i]
    match_end = min(idx + motif_length, len(sw_pitch))
    
    # Extract the corresponding segment from the Star Wars pitch contour
    match_segment = sw_pitch_array[idx:match_end]
    
    # Perform DTW between the motif and this match segment
    dtw_result = dtw_functions.dtw(
        motif_hz_resampled_array, 
        match_segment,
        local_dissimilarity=d.euclidean, 
        n_threads=10, 
        get_visualization=True
    )
    
    dtw_results.append(dtw_result)
    
    # Plot in subplot
    plt.subplot(top_n, 1, i+1)
    plt.title(f"Match {i+1} at {sw_pitch_times[idx]:.2f}s, DTW Distance: {dtw_result['dtw_value']:.2f}")
    plt.imshow(dtw_result['visualization_data'], aspect='auto', cmap='viridis', origin='lower')
    plt.colorbar(label='Distance')

plt.tight_layout()
plt.show()

# Print summary of DTW distances
print("\nDTW Distance Summary:")
for i, result in enumerate(dtw_results):
    idx = match_indices[i]
    print(f"Match {i+1} at {sw_pitch_times[idx]:.2f}s: {result['dtw_value']:.2f}")