In [1]:
import sys
sys.path.insert(1, 'my_utils')

In [2]:
import numpy as np
import os
import yaml
from matplotlib import pyplot as plt
from scipy.signal import medfilt as medfilt
from scipy import signal
import csv


In [8]:
# converts a csv file to a python list of rows
def csvfile_to_list(fp):
    f = open(fp)
    reader = csv.reader(f)
    _ = next(reader)
    list_of_rows = [row for row in reader]
    return list_of_rows


# returns a list of filepaths collected from a parent directory and all subdirectories
def recursive_file_retrieval(parent_path, ignore_hidden_dirs=False, return_parent=True):
    
    file_path_list = []
    dir_list = []
    parent_paths = [parent_path]

    more_subdirs = True
    while more_subdirs == True:
        subdir_paths = []
        for i, parent_path in enumerate(parent_paths):
            # print(parent_path)
            if ignore_hidden_dirs:
                if os.path.basename(parent_path).startswith('.'):
                    continue

            dir_list.append(parent_path)
            r,dirs,files = next(os.walk(parent_path, topdown=True, onerror=None, followlinks=False)) 
            for f in files:
                file_path_list.append(os.path.join(r,f))

            # if there are more subdirectories
            if len(dirs) != 0:
                for d in dirs:
                    subdir_paths.append(os.path.join(r,d))

            # if we've finished going through subdirectories (each parent_path), stop that loop
            if i == len(parent_paths)-1:
                # if loop about to finish, change parent_paths content and restart loop
                if len(subdir_paths) != 0:
                    parent_paths = subdir_paths
                else:
                    more_subdirs = False
    
    if not return_parent: dir_list = dir_list[1:]

    return dir_list, file_path_list


def plot_ts(ts, hop_duration=0.016):
    time = np.arange(len(ts))*hop_duration

    # Plot the time series
    plt.plot(time, ts, label='Time Series')
    plt.xlabel('Time')
    plt.ylabel('Values')
    plt.title('Time Series Example')
    plt.legend()
    plt.grid(True)
    plt.show()
    plt.close()


def normalise(arr):
    mx = arr.max() # removed axis=1
    for i in range(mx.shape[0]):
        if mx[i] > 0:
            arr[i,:] /= mx[i]
    return arr


def tau_to_tempo(tau, hop=0.016, output='bpm'):
    period = tau * hop
    if output == 'bpm':
        return period_to_bpm(period)
    else:
        return period


def period_to_bpm(period):
    return 60/period


def beats_from_beatfile(fp):
    csv_rows = csvfile_to_list(fp)
    ts_beats = []
    for r in csv_rows:
        entry = r[0]
        entries = entry.split('\t')
        timestamp, beat = float(entries[0]), int(entries[1])
        ts_beats.append((timestamp, beat))
    return ts_beats


def temp_from_beatsfile(fp, output='bpm', summation='mean'):
    ts_beats = beats_from_beatfile(fp)
    timestamps = np.array([ts for ts, b in ts_beats])
    if summation == 'mean':
        mean_period = np.mean(np.diff(timestamps))
    elif summation == 'median':
        mean_period = np.median(np.diff(timestamps))
    if output == 'bpm':
        return round(period_to_bpm(mean_period))
    elif output == 'period':
        return mean_period
    

# get several onset arrays using several dsp approaches
def get_onset_arrs(specs):
   
    # get rms values
    rms_values = np.sqrt(np.mean(specs**2, axis=1))
    dim = specs.shape[1]
    # get high frequency content measure
    hfc_muliplier = np.asarray(range(dim))+1
    hfc = np.mean((specs * hfc_muliplier), axis=1)
    # get spectral flux
    delta_spectrogram = np.diff(specs, axis=0)
    delta_spectrogram = np.concatenate((np.zeros((1,dim)), delta_spectrogram))
    mask = delta_spectrogram > 0
    delta_spec_up = delta_spectrogram * mask
    spec_flux = np.sum(delta_spec_up, axis=1)

    return rms_values, hfc, spec_flux
    

# Clean up onset arrays: Smoothen, normalise, denoise, peak-isolation
def cleanup_onset_array(ts, mf_k=3, threshold=0.5, autocor_peak_dist=5):
    med_ts = ts - medfilt(ts, mf_k)
    med_grad_ts = np.gradient(med_ts)
    stdise_med_grad_ts = (med_grad_ts - np.mean(med_grad_ts)) / np.std(med_grad_ts)
    min_max_med_grad_ts = med_grad_ts / np.max(med_grad_ts)
    normmed_med_grad_ts = stdise_med_grad_ts
    thresholded_ts = normmed_med_grad_ts.copy()
    thresholded_ts[thresholded_ts < threshold] = 0
    onset_peak_indices, peak_value_dict = signal.find_peaks(thresholded_ts, height=0, distance=autocor_peak_dist)
    peak_values = peak_value_dict['peak_heights']
    trimmed_odf = np.zeros_like(thresholded_ts)
    trimmed_odf[onset_peak_indices] = peak_values
    return trimmed_odf


# get tempo candidantes from onset array
def get_tempo(onsets, tau_limits):
    autocor = np.correlate(onsets, onsets, mode='full')[int(len(np.correlate(onsets, onsets, mode='full'))//2)+1:]
    acf_peak_indices, acf_peak_value_dict = signal.find_peaks(autocor, height=np.max(autocor)/2, distance=5)
    acf_peak_values = acf_peak_value_dict['peak_heights']
    peaked_autocor = np.zeros_like(autocor)
    peaked_autocor[acf_peak_indices] = acf_peak_values
    realistic_autocor_cands = peaked_autocor[tau_limits[1]:tau_limits[0]]
    best_tau_samples = np.argpartition(realistic_autocor_cands, -4)[-4:]
    return [round(tau_to_tempo(t)) for t in best_tau_samples]


def beatsfp_from_featsfp(feats_fp, beats_dir):
    fn = os.path.basename(feats_fp)
    genre, id = fn.split('.')[:2]
    beats_basename = 'gtzan_' +genre +'_' + id +'.beats'
    beats_fp = os.path.join(beats_dir, beats_basename)
    return beats_fp

In [9]:
feats_dir = '/Users/brendanoconnor/Desktop/career/tempoEstimationTask/stored_feats'
beats_dir = '/Users/brendanoconnor/Downloads/MLExcerciseBrendan/data/GTZAN/beats'
threshold = 0.5
bpm_limits = [50, 250]
mf_k=3
ofd_idx = 1

In [10]:
with open(os.path.join(feats_dir, "feat_params.yaml"), "rb") as handle:
    feat_params = yaml.load(handle, Loader=yaml.FullLoader)

print(feat_params)

# onset detection function generators
hop_duration = feat_params['hop_size'] / feat_params['sr']
dim_size = feat_params['num_mels']
tau_limits = [round(1 / (bpm / 60 * hop_duration)) for bpm in bpm_limits]

{'fft_size': 1024, 'fmax': 7600, 'fmin': 90, 'hop_size': 256, 'num_mels': 80, 'sr': 16000}


### Collect premade features and derive onsets from them in several ways

In [11]:
_, fps = recursive_file_retrieval(feats_dir)
filt_fps = [fp for fp in fps if not fp.startswith('.') and fp.endswith('npy')]

fps_onsets = []
feats_fp = '/Users/brendanoconnor/Desktop/career/tempoEstimationTask/stored_feats/pop/pop.00000.wav.npy'

for feats_fp in filt_fps:
    feats = np.load(feats_fp)
    fps_onsets.append((feats_fp, get_onset_arrs(feats)))



In [12]:

for fp_onset in fps_onsets:
    fp, onsets = fp_onset[0], fp_onset[1]
    onset_arr = onsets[ofd_idx]
    cleanedup_onset_arr = cleanup_onset_array(onset_arr)
    bpms = get_tempo(cleanedup_onset_arr, tau_limits)
    beats_fp = beatsfp_from_featsfp(fp, beats_dir)
    print(temp_from_beatsfile(beats_fp, summation='mean'), bpms)

67 [64, 69, 312, 144]
94 [87, 938, 114, 163]
122 [110, 83, 1250, 250]
