In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
import pandas as pd

import librosa
import librosa.display

In [3]:
# Old version
def norm_corr(x,y):
    if len(x) == 0 or len(y) == 0:
        return [0,0]
    N = x.size + y.size - 1
    # normalize x and y respectively 
    normx = np.linalg.norm(x)
    x1 = x/normx
    normy = np.linalg.norm(y)
    y1 = y/normy
    # pad zeros in front to make the length x+y-1
    x1=np.concatenate([x1, np.zeros(N-x.size)])
    y1 = np.concatenate([y1, np.zeros(N-y.size)])
    #x1 = np.pad(x1, (N-1-x.size, 0), 'constant', constant_values=(0, 0))
    #y1 = np.pad(y1, (N-1-y.size, 0), 'constant', constant_values=(0, 0))
    xfft = np.fft.fft(x1)
    yfft = np.fft.fft(y1)
    corr = np.fft.ifft(np.multiply(np.conjugate(xfft), yfft))

    return corr

def autocorrelate(y, sr):
    # Perform autocorrelation on y
    ncorr = norm_corr(y,y)
    N = y.size

    # Show only the left half (positive x) to compare with spectrogram 
    corr_pos = ncorr[0:N]

    t = np.arange(0, N/sr, 1/sr)

#     # Figure size can be changed here
#     plt.figure(figsize=(8, 6))

#     # Show all integer time values
#     plt.xticks(np.arange(min(t), max(t)+1, 5.0))
#     plt.plot(t, corr_pos)
#     #plt.ylim(bottom = 0, top=0.2)
    
    # Discard the first 2 seconds 
    new_corr_pos = corr_pos[2*sr:]
    new_t = np.arange(2, len(new_corr_pos)/sr+2, 1/sr)
    
#     plt.xticks(np.arange(min(new_t), max(new_t)+1, 8.0))
#     plt.plot(new_t, new_corr_pos)
#     plt.ylabel("corr")
#     plt.xlabel('Time of shift')
#     plt.title("Auto-correlation")

    # Find peaks in autocorrelation result
    # using signal package 
    distance = sr
    peaks, properties = signal.find_peaks(new_corr_pos, distance=distance, height=0)
    heights = properties['peak_heights']
    max_height = heights.max()
    h_thres = max_height * 0.6

    peaks, properties = signal.find_peaks(new_corr_pos, distance=distance, height=h_thres)
    heights = properties['peak_heights']
    peaks_locs = 2+peaks/sr
    
    print("Peak locations (in seconds):", peaks_locs)
    
    # plot peaks 
    plt.figure(figsize=(16, 8))
    plt.title('Peak picking (distance=%2.0f samples, Fs=%3.0f, height >%0.3f)'%(distance, sr,h_thres))
    plt.ylabel("corr")
    plt.xlabel('Time of shift (s)')
    plt.xticks(np.arange(min(new_t), max(new_t)+1, 4.0))
    plt.plot(new_t, new_corr_pos)
    plt.vlines(peaks_locs, 0, 1, color='r', linestyle=':', linewidth=1);
    return peaks_locs

In [4]:
def get_distances_in_measures(distances_s, idx, analysis_dir):
    # convert distances from seconds to beats
    tempo_txt = analysis_dir+idx+'/tempo.txt'
    with open(tempo_txt, 'r') as file:
        # Read tempo of the song
        tempo = int(file.readline())  # beats per minute
        
    song_db = pd.read_csv(analysis_dir+'index.csv')
    song_meta = song_db[song_db['song_id'] == int(idx.lstrip('0'))]
    beats_per_bar = song_meta['num_beats_per_measure'].values[0] 
    beats_per_measure = beats_per_bar*2

    beat_duration = 60/tempo # duration of one beat
    distances_in_beats = distances_s/beat_duration
    distances_in_measures = distances_in_beats/beats_per_measure
    
    return distances_in_measures, beats_per_measure 

def get_beats_per_measure(idx, analysis_dir):
    # convert distances from seconds to beats
    tempo_txt = analysis_dir+idx+'/tempo.txt'
    with open(tempo_txt, 'r') as file:
        # Read tempo of the song
        tempo = int(file.readline())  # beats per minute
        
    song_db = pd.read_csv(analysis_dir+'index.csv')
    song_meta = song_db[song_db['song_id'] == int(idx.lstrip('0'))]
    beats_per_measure = song_meta['num_beats_per_measure'].values[0] 
    
    return beats_per_measure 

In [5]:
import statistics


def affinity_naive(data, threshold):
    ln = len(data)
    A = [[0 for j in range(ln)] for i in range(ln)]
    for i in range(ln):
        for j in range(ln):
            if i>j: # matrix is symmetric
                A[i][j] = A[j][i]
            else:
                corr = np.max(norm_corr(data[i],data[j])[:1000])
                A[i][j] = corr
    return A

def plot_diagnoal(data, i, j, dur):
    for k in range(dur+1):
        data[i+k][j+k] = 1
                            
def affinity_1(data, threshold, shortest,longest):
    ln = len(data)
    A = [[0 for j in range(ln)] for i in range(ln)]
    i = 0
    while i < ln:
        # j > i
        j = i+1
        matching_pairs = []
        while j < ln:
            print(i,j)
            corr = norm_corr(data[i],data[j])[0]
            A[i][j] = corr
            if corr > threshold:
                # stretch that segment diagonally 
                print('stretch')
                curr_len = 0
                curr_i = i
                curr_j = j
                while curr_len < longest:
                    print('curr len:', curr_len)
                    if curr_i+1 == ln or curr_j+1 == ln:
                        break
                    else:
                        curr_i += 1
                        curr_j += 1
                        next_in_diag = norm_corr(data[curr_i],data[curr_j])[0]
                        A[curr_i][curr_j] = next_in_diag
                        if next_in_diag > threshold:
                            curr_len += 1
                        else:
                            break
                if curr_len > shortest:
                    matching_pairs.append([i,j,curr_len])
                    j = curr_j
                else:
                    j+=1
            else:
                j += 1
        # iterate matching pairs to plot for other pairs 
        for k in range(len(matching_pairs)):
            for p in range(k+1,len(matching_pairs)):
                new_i = matching_pairs[k][1]
                new_j = matching_pairs[p][1]
                dur = min(matching_pairs[k][2], matching_pairs[p][2])
                # new_i < new_j
                plot_diagnoal(A,new_i,new_j,dur)
                
        if len(matching_pairs)>0:
            print(matching_pairs)
            i+=matching_pairs[0][2]
        else:
            i+= 1
    
    return A
            
    

def affinity_heuristic(data, distances, shortest,longest):
    ln = len(data)
    thresh = 0.5
    A = [[0 for j in range(ln)] for i in range(ln)]

    # Initialize matrix, compute A[i][i+d] for all d in distances
    for i in range(ln):
        for d in distances:
            if i+d < ln:
                A[i][i+d] = np.max(norm_corr(data[i],data[i+d])[:1000])
                
    
    for i in range(len(data)):
        for j in range(len(data)):
            if i>j: # matrix is symmetric
                A[i][j] = A[j][i]
            if A[i][j]>thresh:
                # stretch that segment diagonally 
                curr_len = 1
                curr_i = i
                curr_j = j
                while curr_len < longest:
                    if curr_i+1 == ln or curr_j+1 == ln:
                        break
                    else:
                        curr_i += 1
                        curr_j += 1
                        next_in_diag = np.max(norm_corr(data[curr_i],data[curr_j])[:1000])
                        A[curr_i][curr_j] = next_in_diag
                        if next_in_diag > thresh:
                            curr_len += 1
                        else:
                            break
    return A

# Diagonal smoothing
def smooth(A):
    smoothed = [[0 for j in range(len(A[0]))] for i in range(len(A))]


    for i in range(len(A)-1):
        for j in range(len(A[0])-1):
            if i > 0 and j > 0:
                smoothed[i][j] = statistics.median([A[i-1][j-1],A[i][j], A[i+1][j+1]])
    return smoothed

In [6]:
def get_beat_segments(y, sr, idx):
    beat_midi = f'../music_structure_analysis/POP909-Dataset-midi/POP909/{idx}/beat_midi.txt'
    chord_midi = f'../music_structure_analysis/POP909-Dataset-midi/POP909/{idx}/chord_midi.txt'
    df_beat = pd.read_csv(beat_midi, header = None, sep =' ', names=['time','b1','b2'])
    df_chord = pd.read_csv(chord_midi, header = None, sep = '\t', names=['start','end','chord'])
    chords = df_chord['chord']
    k = 0 
    found = False
    while not found:
        chord = chords[k]
        if chord != 'N':
            found = True
            break
        k += 1

    df_beat = df_beat.loc[k:].reset_index(drop=True)
    beat_times = df_beat.iloc[:,0]
    
    # Extract audio segments corresponding to each beat
    beat_segments = []

    for i in range(len(beat_times)-1):
        # Calculate sample index corresponding to current beat
        sample_index_start = librosa.time_to_samples(beat_times[i], sr=sr)
        sample_index_end = librosa.time_to_samples(beat_times[i+1], sr=sr)
        # Extract audio segment corresponding to current beat
        beat_segments.append(y[sample_index_start: sample_index_end])
    
    return beat_segments

In [7]:
def merge_beats_to_measures(beat_segments, beats_per_measure):
    # Create a new list to store the merged segments (merge 3/4 beats together)
    merged_segments = []
    if beats_per_measure == 2:
        beats_per_measure = 4
    # Loop over the segments and merge adjacent groups of four segments
    for i in range(0, len(beat_segments), beats_per_measure):
        # Create a new merged segment by concatenating four adjacent segments
        merged_segment = beat_segments[i]
        for j in range(1, beats_per_measure):
            if i+j < len(beat_segments):
                merged_segment = np.concatenate((merged_segment, beat_segments[i+j]))

        # Add the new merged segment to the list of merged segments
        merged_segments.append(merged_segment)
    
    return merged_segments

In [8]:
def get_segments(idx, analysis_dir, midi_dir):
    
    # Load audio and samping rate 
    print('Loading audio...')
    filename = midi_dir+idx+'/'+'melody_extracted.wav'
    print(filename)
    y, sr = librosa.load(filename, sr=None)
    
    # Perform autocorrelation on y to get distances between repetitions
    print('Performing autocorrelation...')
    distances_s = autocorrelate(y,sr)
    
    # Convert distances from seconds to quarters
    # so as to align with human-labels provided
    distances_measures, beats_per_measure = get_distances_in_measures(distances_s, idx, analysis_dir)
    distances_measures = [round(x) for x in distances_measures if x >= 4]
    print(distances_measures)
    
    # Sync audio with beats
    # Read beat times from CSV file
    beat_file = midi_dir+idx+'/beat_audio.txt'
    beat_segments = get_beat_segments(y, sr, idx)
    
    # Merge beats into bars
    print('Merging beats to measures...')
    measure_segments = merge_beats_to_measures(beat_segments, beats_per_measure)
    
    # Construct recurrence matrix 
    min_num_of_measures = 4
    max_num_of_measures = 20 # 20 measures at most
    A = affinity_naive(bar_segments, distances_measures, min_num_of_measures, max_num_of_bars)
#     A_abs = [[abs(A[i][j]) for j in range(len(A[0]))] for i in range(len(A))]
#     A_filt = [[1 if A_abs[i][j] > 0.5 and i != j else 0 for j in range(len(A_abs[0]))] for i in range(len(A_abs))]


    # Plot recurrence matrix 
    print('Plotting recurrence matrix...')
    plt.imshow(A)
    plt.colorbar()
    plt.show()
    
    return A
    
#     # Apply spectral clustering to affinity matrix
#     print('Clustering...')
#     from sklearn.cluster import SpectralClustering
#     n_clusters = 4  # number of clusters to group the audio into
#     clustering = SpectralClustering(n_clusters=n_clusters, affinity='precomputed').fit(A_filt)

#     # Find the boundaries between clusters
#     cluster_boundaries = [0] + list(np.where(np.diff(clustering.labels_))[0] + 1) + [len(clustering.labels_)]
#     print(cluster_boundaries)
#     # Print the boundaries of each cluster
#     for i in range(len(cluster_boundaries) - 1):
#         print(f'Cluster {i + 1}: {cluster_boundaries[i]}-{cluster_boundaries[i+1]-1}')

In [9]:
import csv

def store_recurrence_matrix(idx,analysis_dir,midi_dir):
    
    # Load audio and samping rate 
    print('Loading audio...')
    filename = midi_dir+idx+'/'+idx+'.wav'
    print(filename)
    y, sr = librosa.load(filename, sr=None)
    
#     # Perform autocorrelation on y to get distances between repetitions
#     print('Performing autocorrelation...')
#     distances_s = autocorrelate(y,sr)
#     distances_s= [0,1]
    
    # Convert distances from seconds to quarters
    # so as to align with human-labels provided
    beats_per_measure = get_beats_per_measure(idx, analysis_dir)
#     distances_measures = [round(x) for x in distances_measures if x >= 4]
    
    # Sync audio with beats
    # Read beat times from CSV file
    beat_file = midi_dir+idx+'/beat_audio.txt'
    beat_segments = get_beat_segments(y, sr, idx)
    
    # Merge beats into bars
    print('Merging beats to measures...')
    measure_segments = merge_beats_to_measures(beat_segments, beats_per_measure)
    
    # Construct recurrence matrix 
#     min_num_of_measures = 4
#     max_num_of_measures = 20 # 20 measures at most
    A = affinity_naive(measure_segments,threshold=0)
    A_abs = [[abs(A[i][j]) for j in range(len(A[0]))] for i in range(len(A))]
    
    with open(f'{idx}_RM.csv', 'w', newline='') as file:
        writer = csv.writer(file)
        # write the data to the CSV file
        for row in A_abs:
            writer.writerow(row)
#     A_filt = [[1 if A_abs[i][j] > 0.5 and i != j else 0 for j in range(len(A_abs[0]))] for i in range(len(A_abs))]



In [10]:
import re
import pandas as pd

def get_annotation(path):
    '''
    Reads structure and converts start and end times to symbolic ticks. Each tick is a 16th note.    
    '''
    string = str(np.loadtxt(path, dtype='str'))
    # use regular expressions to separate the alphabet and number characters
    alpha_list = re.findall('[a-zA-Z]', string)
    num_list = [int(s) for s in re.findall(r'\d+', string)]

    # create a dictionary with the separated lists
    data = {'phrase': alpha_list, 'num_bars': num_list}
    
    structure = pd.DataFrame(data)

    # Remove lowercase phrase labels, in Dai et. al 2021, they are non-melodic phrases, and don't repeat in music.
    structure['num_bars'] = pd.to_numeric(structure['num_bars'])
    total_num_of_bars = sum(structure['num_bars'])
    structure['duration'] = structure['num_bars']  # duration = 1 measure (actually 2 bars)
    structure['end'] = structure['duration'].cumsum()
    structure['start'] = structure['end'] - structure['duration']
#     structure = structure[structure['phrase'].str.match(r'(^[A-Z].*)')==True]

    return structure, total_num_of_bars

In [11]:
def build_recurrence_matrix(label):
    structure, total_num_of_bars = get_annotation(label)
    # modify number of bars -> * 2 
    
    A =  [[0 for j in range(total_num_of_bars)] for i in range(total_num_of_bars)]
    groups = structure.groupby('phrase').groups
    print(structure)
    

    for label in groups:
        if label.isupper() and label != 'X':
            seg_idx = groups[label]
            for i in seg_idx:
                for j in seg_idx:
                    if i != j:
#                         print(label,i,j)
                        start_i = structure.iloc[i]['start']
                        start_j = structure.iloc[j]['start']
                        dur = structure.iloc[i]['duration']
                        while dur != 0 and start_i :
                            A[start_i][start_j] = 1
                            start_i+=1 
                            start_j+=1
                            dur -= 1
    plt.imshow(A)
    plt.colorbar()
    plt.show()
    
    return A

In [12]:

# Apply to all files
midi_dir = "../music_structure_analysis/POP909-Dataset-midi/POP909/"
analysis_dir = "../music_structure_analysis/hierarchical-structure-analysis/POP909/"
for i in range(500, 909):
    if i == 9: 
        continue
    # get the index string for filename
    idx = '{:03d}'.format(i)
    store_recurrence_matrix(idx,midi_dir=midi_dir, analysis_dir=analysis_dir)

Loading audio...
../music_structure_analysis/POP909-Dataset-midi/POP909/001/001.wav
Merging beats to measures...


KeyboardInterrupt: 