In [1]:
import numpy as np
import os
import pdb
import evaluate
import librosa
import math
import mir_eval
import IPython.display as ipd
from matplotlib import pyplot as plt
from scipy.signal import medfilt as medfilt
from scipy.misc import electrocardiogram
from scipy import signal
from scipy.ndimage import maximum_filter1d as maxfilt

In [2]:
#odf generators
threshold = np.asarray([[0.1], [0.125], [0.2], [0.25], [0.0], [0.0], [0.0]])
winSize_secs = 0.04
hopSize_secs = 0.01
wd=3
selected_odf = 1

dataDir = './BallroomData'
annotDir = './BallroomAnnotations'
# dataDir = './BallroomAuditor'

# define reasonable limitations
bpm_limits = [61, 200]
tau_limits = []
for bpm in bpm_limits:
    freq = bpm / 60
    tau = round(1 / (freq * hopSize_secs))
    tau_limits.append(tau)

# beat tracking
analysis_fraction = 4 #analyses the first 1/n samples of the song to determine beat 1 location
half_sqr_width = 5
performance_deviation = 4
tempo_dec_tol = .20 #percent
tempo_inc_tol = .10
alpha = 1
final_offset = 0 #postprocessing

if os.path.isdir('beats') == False:
    os.makedirs('beats')

In [3]:
def get_all_odf(filename, winSize_secs, hopSize_secs):
#     print('file is: ', filename)
    snd, rate = librosa.load(filename, sr=None)
    hop = round(rate * hopSize_secs)
    winLen_samps = rate * winSize_secs
    # round up to next power of 2
    wlen_samps = int(2 ** np.ceil(np.log2(winLen_samps)))
#     print('window length is: ', wlen_samps)
#     print('hop length is: ', hop)
    # centre frames: first frame at t=0 - Kind of a hack way of thinking about it
    snd = np.concatenate([np.zeros(wlen_samps//2), snd, np.zeros(wlen_samps//2)])
    # how many frames will cover this sound recording - Do the '- wlen_samps' and  '+1' cancel each other out?
    frameCount = int(np.floor((len(snd) - wlen_samps) / hop + 1))
#     print('Frames per audio file', frameCount)
    #odf is an object that holds 7 types of measurements for onsets
    odf = np.zeros((7, frameCount))
    ham_wind = np.hamming(wlen_samps)
    #preempting prev values needed
    prevM = np.zeros(wlen_samps)
    prevA = np.zeros(wlen_samps)
    prevprevA = np.zeros(wlen_samps)
    # cycle through audio and add each calculation to the appropriate odf frame as you go
    for i in range(frameCount):
        start = i * hop
        # mutiply signal frame by hamming frame and get fft of this
        # remember that the fft consists of a real and imaginary number
        currentFrame = np.fft.fft(snd[start: start+wlen_samps] * ham_wind)
        
        """Root Mean Square"""
        mags = np.abs(currentFrame)
        squared_mags = np.power(mags, 2)
        mean_sqr_mags = np.mean(squared_mags)
        rms = np.sqrt(mean_sqr_mags)
        odf[0][i] = rms
#         print('rms', rms)
        
        """High Freq Content"""
        # Since mags is mirrored after the Nyquist index, multiplying this by a mirrored index and getting its
        # mean will be the same as getting the mean of the first half of the mags 
        k_vector = list(range(wlen_samps//2)) + list(range(wlen_samps//2,0,-1))
        hfc_calc1 = np.mean(np.multiply(squared_mags, k_vector))
        odf[1][i] = hfc_calc1
        
        # hfc while trying to follow the algorithm (especially where k=-N/2)
        weight_vector = np.arange(wlen_samps) - wlen_samps//2
        # half the values cancel the other half out, except for 1, which is left
        left_over = np.sum(np.multiply(weight_vector, squared_mags))
        hfc_calc2 = 4/(wlen_samps*(wlen_samps+2)) * np.sum(np.multiply(weight_vector, squared_mags))
        
        """Spectral FLuz"""       
        sf = np.mean(np.multiply(np.subtract(mags, prevM),np.greater(mags, prevM)))
#         print('sf',sf)
        odf[2][i] = sf
        
        #apply half-wave retifier logic
        h0 = ((mags - prevM) + abs(mags - prevM))/2
        # when h0 element is less than 0, make it zero (using boolean array method)
        H = h0[h0 > 0]
        sf2 = np.sum(H) / wlen_samps
        
        """Complex Domain"""
        
        phase = np.angle(currentFrame)
        tPhase = np.subtract(np.multiply(prevA, 2), prevprevA)
        cdVector = np.sqrt(np.subtract(np.add(np.power(prevM,2), np.power(mags,2)),
                        np.multiply(np.multiply(np.multiply(prevM, mags), 2),
                        # this is the princarg phase wrapping?
                        np.cos(np.subtract(phase, tPhase)))))
        odf[3][i] = np.mean(cdVector)
        rcd = np.mean(np.multiply(np.greater_equal(mags, prevM), cdVector))
        odf[4][i] = rcd
        
        """Phase Deviation"""
        pdVector = np.abs(np.divide(np.subtract(np.mod(np.add(
                        np.subtract(phase, tPhase), np.pi), 2 * np.pi), np.pi), np.pi))
        odf[5][i] = np.mean(pdVector)
        if rms != 0:
            wpd = np.divide(np.mean(np.multiply(pdVector, mags)), rms * 2)
        else:
            wpd=0
        odf[6][i] = wpd
        
        rate_change_phase = phase - prevA
        i_component=(phase + rate_change_phase)
        target_complex_value = mags * np.exp(i_component)
        prevM = mags
        prevA = phase
    return odf

def normalize(arr):
    mx = arr.max(axis=1)
    for i in range(mx.shape[0]):
        if mx[i] > 0:
            arr[i,:] /= mx[i]
    return arr

def process_odf(selected_odf, maxFiltODF, threshold_array):
    # convert odf to a signal that marks the instance and magnitude of the onset only
    caught_odf = threshold_array[selected_odf]
    odf = maxFiltODF[selected_odf]
    threshold_odf = threshold[selected_odf]
    type(threshold_odf), threshold_odf
    caught_odf[caught_odf == threshold_odf[0]] = 0
    # trim threshold values so there only a single peak present every 50ms (Dixon 2001)
    onset_peak_indices, peak_value_dict = signal.find_peaks(caught_odf, height=0, distance=10)
    peak_times = onset_peak_indices * hopSize_secs
    peak_values = peak_value_dict['peak_heights']
    trimmed_odf = np.zeros_like(caught_odf)
    trimmed_odf[onset_peak_indices] = peak_values
    return trimmed_odf, onset_peak_indices

def acfTempo(trimmed_odf, tau_limits):
    # autocorrelate the entire odf signal with itself
    autocor = np.correlate(trimmed_odf, trimmed_odf, mode='full')[int(len(np.correlate(trimmed_odf, trimmed_odf, mode='full'))//2)+1:]
    acf_peak_indices, acf_peak_value_dict = signal.find_peaks(autocor, height=np.max(autocor)/2, distance=5)
    acf_peak_values = acf_peak_value_dict['peak_heights']
    peaked_autocor = np.zeros_like(autocor)
    peaked_autocor[acf_peak_indices] = acf_peak_values
    best_tau_samples = np.argpartition(autocor, -4)[-4:]
    peak_values = autocor[best_tau_samples]
    #most reasonable value is between tau_limits
    for best_tau_sample in best_tau_samples:
        if best_tau_sample > tau_limits[1] and best_tau_sample < tau_limits[0]:
            tactus_tau = best_tau_sample
        else:
            tactus_tau = np.min(best_tau_samples)
    #  add 1 to tactus_tau (due to 0 indexing) to get the true sample delay
    tactus_tau = tactus_tau + 1
    return tactus_tau

def beatTimes_to_array(trueOnsets, trimmed_odf):
    # converts the trueOnsets to a usable array for visualisations
    trueOnsets_resampled = trueOnsets * 100
    trueOnsets_beats = np.zeros_like(trimmed_odf)
    for beat_time in trueOnsets_resampled:
        beat_time = int(beat_time)
        trueOnsets_beats[beat_time] = 1
    return trueOnsets_beats

def find_first_beat(trimmed_odf, tactus_tau, onset_peak_indices):
    # make a pulse train
    window_length = len(trimmed_odf)//analysis_fraction
    # THIS GIVES YOU A VISUALISATION OF HOW THEY MATCH UP
    # Larcohe 2003, Alonos 2004
    # -5 to 5 used to represent precentage that tempo can vary in either direction
    sqr_pulse = np.zeros_like(trimmed_odf)
    for i in range(len(trimmed_odf)//tactus_tau):
        index = i * tactus_tau
        for i in range(-half_sqr_width,half_sqr_width):
            sqr_pulse[index+i] = 1

    correlation_at_onsets = []
    # assume the first 10 onsets - one of them is the downbeat
    initial_onsets_to_analyse = 10
    # just empircally testing to scan first 6 onsets and choose the best one that will tell me where the first onset is
    for onset_peak_index in onset_peak_indices[:initial_onsets_to_analyse]:
        trimmed_odf_window = trimmed_odf[onset_peak_index : onset_peak_index+window_length]
        if len(trimmed_odf_window) < window_length:
            break
        pulse_window = sqr_pulse[:window_length]
        summed_sigs = np.sum(trimmed_odf_window * pulse_window)
        correlation_at_onsets.append(summed_sigs)
    # plt.plot(correlation_at_onsets)
    onset_peak_index = np.where(np.asarray(correlation_at_onsets) == np.max(np.asarray(correlation_at_onsets)))[0][0]
    first_beat_index = onset_peak_indices[onset_peak_index]
    return first_beat_index

def find_beats(trimmed_odf, tactus_tau, first_beat_index):
    beat_indices = [first_beat_index]

    tactus_dec_tol = round(tactus_tau * tempo_dec_tol) #tempo increase means increasing tau value
    tactus_inc_tol = round(tactus_tau * tempo_inc_tol) #tempo increase means decreasing tau value
    outside_perf_region = 0
    missing_onsets = 0
    within_tempo_region = 0

    while beat_indices[-1] < len(trimmed_odf)-tactus_tau:
        next_beat_pred = beat_indices[-1] + tactus_tau
    #     print('next_beat_pred',next_beat_pred)
        performance_region = trimmed_odf[next_beat_pred - performance_deviation: next_beat_pred + performance_deviation+1]
        onset_locations = np.where(performance_region != 0)[0]
        if len(onset_locations)>0:
            missing_onsets = 0
            # find out which location is closer to 5
            closest_to_centre = performance_deviation + 2
            for onset in onset_locations:
                if abs(onset - performance_deviation) < closest_to_centre:
                    closest_index = onset
                    closest_to_centre = abs(onset - performance_deviation)
            offset = closest_index - performance_deviation
            adjusted_beat_pred = next_beat_pred + offset
        else:
            # check the window tollerance for tempo change
            tempo_change_region = trimmed_odf[next_beat_pred - tactus_inc_tol: next_beat_pred + tactus_dec_tol+1] 
            onset_locations = np.where(tempo_change_region != 0)[0]
            if len(onset_locations)>0:
                missing_onsets = 0
                closest_to_centre = tactus_dec_tol + 2 # just to start off
                for onset in onset_locations:
                    centered_index = tactus_dec_tol-tactus_inc_tol
                    if abs(centered_index - onset) < closest_to_centre:
                        closest_index = onset
                        closest_to_centre = abs(centered_index-onset)
    #             pdb.set_trace()
                offset = closest_index - centered_index
                adjusted_beat_pred = next_beat_pred + offset
                tactus_tau = round((alpha*(tactus_tau + offset)) + ((1 - alpha)*tactus_tau))
            else:
                # assign a beat to default place
                missing_onsets += 1
                adjusted_beat_pred = next_beat_pred
    #     print('adjusted_beat_pred',adjusted_beat_pred)
        beat_indices.append(adjusted_beat_pred)


    # if last onsets were missed, remove them
    beat_indices = beat_indices[:len(beat_indices)-missing_onsets]

    missing_onsets = 0
    no_more_anacrusis = False
    while beat_indices[0] - tactus_tau > 0 and no_more_anacrusis == False:
    #     pdb.set_trace()
        previous_beat_pred = beat_indices[0] - tactus_tau
        anacrusis_region = trimmed_odf[previous_beat_pred-performance_deviation: previous_beat_pred+performance_deviation]
        onset_locations = np.where(anacrusis_region != 0)[0]
        if len(onset_locations)>0:
            # find out which location is closer to 5
            closest_to_centre = performance_deviation + 2
            for onset in onset_locations:
                if abs(onset - performance_deviation) < closest_to_centre:
                    closest_index = onset
                    closest_to_centre = abs(onset - performance_deviation)
            offset = closest_index-performance_deviation
    #         pdb.set_trace()
            adjusted_beat_pred = previous_beat_pred + offset
            beat_indices.insert(0, adjusted_beat_pred)
        else:
            no_more_anacrusis = True
    beat_indices = beat_indices[missing_onsets:]
    return beat_indices

def beatTracker(input_file='Albums-Cafe_Paradiso-14.wav'):
    f_path = './' +input_file
    odf = get_all_odf(f_path, winSize_secs, hopSize_secs)
    # FILTERING AND PROCESSING OF THE ODFs
    medFiltODF = odf - medfilt(odf, wd)
    for row in medFiltODF:
        maxFiltEntry = maxfilt(medFiltODF, wd, mode='nearest', axis=0)
        try:
            np.concatenate((maxFiltODF, maxFiltEntry), axis=0)
        except:
            maxFiltODF = maxFiltEntry
    maxFiltODF = maxfilt(medFiltODF, wd, mode='nearest', axis=1)
    maxFiltODF = np.gradient(maxFiltODF, axis=1)
    maxFiltODF = np.divide(np.subtract(maxFiltODF, np.mean(maxFiltODF)), np.std(maxFiltODF))
    maxFiltODF = normalize(maxFiltODF)
    # removing any artefacts causing false positives at the very start or end - this would not interefere with any beates
    maxFiltODF[:,:3] = maxFiltODF[:,3000:] = 0
    # FILL THRESHOLD ARRAY WITH VALUES THAT EXCEED EACH THRESHOLF
    threshold_array = np.ones(maxFiltODF.shape) * threshold
    for row_idx in range(len(maxFiltODF)):
        for col_idx in range(len(maxFiltODF[row_idx])):
            threshold_array[row_idx][col_idx] = max(maxFiltODF[row_idx][col_idx], threshold[row_idx]) 
    # PROCESS ODF
    trimmed_odf, onset_peak_indices = process_odf(selected_odf, maxFiltODF, threshold_array)
    # AUTOCORRELATION TO DETERMINE TEMPO
    tactus_tau = acfTempo(trimmed_odf, tau_limits)
    first_beat_index = find_first_beat(trimmed_odf, tactus_tau, onset_peak_indices)
    beat_indices = find_beats(trimmed_odf, tactus_tau, first_beat_index)
    beat_indices = np.asarray(beat_indices) - final_offset
    beat_times = beat_indices * hopSize_secs
    return beat_times


# Single line Implementation of beatTracker
Insert filepath in parantheses for custom analysis

In [4]:
beats = beatTracker()
beats

array([ 1.2 ,  1.91,  2.61,  3.32,  4.02,  4.71,  5.4 ,  6.1 ,  6.79,
        7.51,  8.19,  8.9 ,  9.61, 10.28, 10.98, 11.68, 12.39, 13.1 ,
       13.79, 14.49, 15.17, 15.88, 16.58, 17.28, 17.96, 18.67, 19.37,
       20.08, 20.78, 21.48, 22.18, 22.88, 23.57, 24.28, 25.02, 25.69,
       26.36, 27.07])