# Beat-Upbeat Ratio Distributions

## Import dependencies, set constants etc.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.signal as signal
import scipy.stats as stats
import statsmodels.formula.api as smf

from src import utils
from src.detect.detect_utils import OnsetMaker
from src.features.rhythm_features import BeatUpbeatRatio
from src.visualise.bur_plots import *

In [None]:
# These variables are used for the optimization process
SEED = 42
N_FOLDS = 5
N_JOBS = -1
N_BOOT = 999

In [None]:
# Upper and lower bounds to use when thresholding BURs
BUR_UPPER = 4.0
BUR_LOWER = 0.25

In [None]:
# Set the seed in NumPy for consistent results across function calls
np.random.seed(SEED)

## Load in data
We start by loading in our onset data. This gives us a list of `OnsetMaker` classes (defined in `src\detect\detect_utils.py`).

In [None]:
onsets: list[OnsetMaker] = utils.unserialise_object(fr'{utils.get_project_root()}\models\matched_onsets_corpus_chronology')

In [None]:
res = []
# Iterate through each track
for num, track in enumerate(onsets, 1):
    print(f'{num} / {len(onsets)}')
    # Convert the summary dictionary (dictionary of arrays) to a dataframe
    summary_dict = pd.DataFrame(track.summary_dict)
    # Iterate through each instrument
    for instr in utils.INSTRUMENTS_TO_PERFORMER_ROLES.keys():
        # Subset to get my onsets and partner onsets as separate dataframes
        my_onsets = track.ons[instr]
        my_beats = summary_dict[instr]
        # Extract BURs using our feature class
        bm = BeatUpbeatRatio(my_onsets=my_onsets, my_beats=my_beats, clean_outliers=False)
        # Iterate through every log BUR by that musician
        for bur in bm.bur_log['burs'].dropna().values:
            # Append a new dictionary
            res.append(dict(
                mbz_id=track.item['mbz_id'],
                bur=bur,
                instrument=instr,
                tempo=track.tempo,
                bandleader=track.item['pianist']
            ))
burs = pd.DataFrame(res)

Before we do any cleaning, get the total number of beat-upbeat ratios

In [None]:
total_no_clean = burs[burs['instrument'] == 'piano'].shape[0]
print(total_no_clean)

## Clean data
We drop BUR values lower than 0.25 and higher than 4 (see Corcoran & Frieler, 2021)

In [None]:
burs = burs[(burs['bur'] > np.log2(BUR_LOWER)) & (burs['bur'] < np.log2(BUR_UPPER))]

Now we can get the total number of BURs after cleaning

In [None]:
total_after_clean = burs[burs['instrument'] == 'piano'].shape[0]
print(total_after_clean)
print(1 - (total_after_clean / total_no_clean))

We can also get the number of BURs per instrument

In [None]:
burs.groupby('instrument')['bur'].count()

In [None]:
np.exp(burs['bur'].mean())

In [None]:
burs.groupby('instrument')['bur'].mean()

In [None]:
burs.groupby('instrument')['bur'].mean().apply(np.exp)

In [None]:
means = []
pno = burs[burs['instrument'] == 'piano']
for state in range(10000):
    if state % 10 == 0:
        print(state)
    bls = pd.Series(pno['bandleader'].unique()).sample(frac=1, replace=True, random_state=state)
    means.append(pd.concat(pno[(pno['bandleader'] == bl)]['bur'] for bl in bls).mean())
print(np.percentile(means, 2.5), np.percentile(means, 97.5))

Now we order our dataframe so that the instruments are in the correct order (piano -> bass -> drums)

In [None]:
burs = (
    burs.set_index('instrument')
    .loc[utils.INSTRUMENTS_TO_PERFORMER_ROLES.keys()]
    .reset_index(drop=False)
)

In [None]:
jm = burs[(burs['instrument'] == 'piano')]['bur']
means = [jm.sample(frac=1, replace=True, random_state=i).mean() for i in range(10000)]
print(np.quantile(means, 0.025), np.quantile(means, 0.975))

In [None]:
np.log2(BUR_LOWER)

## Plot the average BUR per instrument

In [None]:
BarPlotBUR(burs).create_plot()
plt.show()

## Plot BUR distribution per performer

In [None]:
ViolinPlotBURs(burs, include_images=False).create_plot()
plt.show()

## Compute the KDE and extract peaks

In [None]:
def get_peaks(data, len_data: int = 1000, **kwargs) -> np.ndarray:
    """Fits a kernel-density estimate to BUR data and extracts BUR peaks"""
    # Fit the actual KDE to the data, using the default parameters
    kde = stats.gaussian_kde(data.T, bw_method='silverman')
    # Create a linear space of integers ranging from our lowest to our highest BUR
    data_plot = np.linspace(data.min(), data.max(), len_data)[:, np.newaxis]
    # Evaluate the KDE on our linear space of integers
    kde_eval = kde.evaluate(data_plot.T)
    # Find the peaks from our fitted KDE
    peaks, _ = signal.find_peaks(kde_eval, **kwargs)
    # Return the sorted peaks from our KDE: this will be an array of BUR values
    return np.sort(data_plot[peaks].flatten())

In [None]:
from joblib import Parallel, delayed
import src.visualise.visualise_utils as vutils

def bootstrap_peaks(data: np.array, actual_peaks: np.array, tol: float = 0.5) -> dict:
    """Bootstrap confidence intervals for an array of peaks"""
    def _boot(state, actual_peak) -> float:
        """Bootstrapping function for an individual peak"""
        # Set the random seed
        np.random.seed(state)
        # Take a random sample of our BURs, with replacement, and reshape
        boot = np.random.choice(data, replace=True, size=size).reshape(-1, 1)
        # Get the peaks for our bootstrapped sample
        boot_peaks = np.array(get_peaks(boot))
        # Get the distances between each bootstrapped peak and the actual peak
        distance = np.abs(np.unique(boot_peaks) - actual_peak)
        # Iterate through the peaks by minimum distance to actual peak
        for boot_peak in boot_peaks[np.argsort(distance)]:
            # If the distance to the peak is less than the tolerance
            if abs(boot_peak - actual_peak) <= tol:
                # We can return this bootstrapped peaks
                return boot_peak
        # Otherwise, if we have no matches, return NaN
        return np.nan

    # We perform a few operations here, so we don't have to re-do them every loop
    size = len(data)
    data = data.flatten()
    # For every peak, get all bootstrapped peaks
    boot_res = {peak: [Parallel(n_jobs=-1, verbose=10)(delayed(_boot)(st, peak) for st in range(vutils.N_BOOT))] for peak in actual_peaks}
    # Return a dictionary with confidence intervals for each peak
    return {k: [np.nanpercentile(v, 2.5), np.nanpercentile(v, 97.5)] for k, v in boot_res.items()}

In [None]:
res_ = []
for instr, grp in burs.groupby('instrument', sort=False):
    X = grp['bur'].to_numpy().reshape(-1, 1)
    grp_peaks = get_peaks(X)
    ci_peaks = bootstrap_peaks(X, grp_peaks)
    for num, (actual, (low, high)) in enumerate(ci_peaks.items()):
        res_.append(dict(
            instrument=instr,
            peak_num=num,
            peak=actual,
            low=low,
            high=high
        ))
peaks_df = pd.DataFrame(res_)

In [None]:
print(peaks_df)

## Estimate density of distribution between peaks

In [None]:
def estimate_density(peaks, burs):
    # Peak 1
    p1 = burs[burs <= (peaks[1] + peaks[0]) / 2]
    p1_len = (len(p1) / len(burs)) * 100
    # Peak 2
    p2 = burs[burs > (peaks[1] + peaks[0]) / 2]
    p2_len = (len(p2) / len(burs)) * 100
    return p1_len, p2_len


bass_peaks = peaks_df[peaks_df['instrument'] == 'bass']['peak'].sort_values().values
bass_weight = estimate_density(
    bass_peaks,
    burs[burs['instrument'] == 'bass']['bur']
)
print(bass_weight)
# drums_peaks = peaks_df[peaks_df['instrument'] == 'drums']['peak'].sort_values().values
# drums_weight = estimate_density(
#     drums_peaks,
#     burs[burs['instrument'] == 'drums']['bur']
# )
# print(drums_weight)

In [None]:
bandleaders = pd.Series(burs['bandleader'].unique())
bandleaders_sample = [bandleaders.sample(replace=True, frac=1) for i in range(1000)]
for instr, instr_peaks in zip(['bass'], [bass_peaks]):
    boot_low, boot_high = [], []
    for n, sample in enumerate(bandleaders_sample):
        print(n)
        data = []
        for _, bandleader in sample.items():
            data.extend(burs[(burs['bandleader'] == bandleader) & (burs['instrument'] == instr)]['bur'].to_list())
        low, high = estimate_density(instr_peaks, pd.Series(data),)
        boot_low.append(low)
        boot_high.append(high)
    print(instr, 'low_peak', stats.sem(boot_low) * 1.96)
    print(instr, 'high_peak', stats.sem(boot_high) * 1.96)

## Plot the BUR distribution with density curve and peaks

In [None]:
HistPlotBURByInstrument(burs, peaks_df).create_plot()
plt.show()

## Model average tempo vs BUR

In [None]:
average = burs.groupby(['instrument', 'mbz_id']).agg(dict(bur=['mean', 'count'], tempo='median', bandleader='first')).reset_index(drop=False)
average.columns = ['_'.join(col).strip() for col in average.columns.values]
print(len(average[average['bur_count'] < 15]))
average = average[average['bur_count'] > 15]


In [None]:
average['tempo_standard'] = (average['tempo_median'] - average['tempo_median'].mean()) / average['tempo_median'].std()

In [None]:
print(average['tempo_median'].mean())
print(average['tempo_median'].std())

In [None]:
md = smf.mixedlm(
    "bur_mean ~ tempo_standard * C(instrument_, Treatment(reference='piano'))",
    data=average,
    groups=average['bandleader_first'],
    re_formula="0 + tempo_standard + C(instrument_, Treatment(reference='piano'))"
).fit()
print(md.summary())

In [None]:
(md.params['tempo_standard'] / average['tempo_median'].std()) * 10

In [None]:
# Variance explained by the fixed effects: we need to use md.predict() with the underlying data to get this
var_fixed = md.predict().var()
# Variance explained by the random effects
var_random = float(md.cov_re.to_numpy().mean())
# Variance of the residuals
var_resid = md.scale
# Total variance of the model
total_var = var_fixed + var_random + var_resid
# Calculate the r2 values and append to the model
print('conditional_r2:', (var_fixed + var_random) / total_var)
print('marginal_r2:', var_fixed / total_var)

In [None]:
stddev = np.std([v.iloc[0] for v in md.random_effects.values()])
print('stdev of bandleader groups:', stddev)

## Plot average BUR vs tempo

In [None]:
RegPlotBURTempo(burs).create_plot()
plt.show()