In [24]:
import librosa
import numpy as np
import pickle
import pandas as pd
import random
import os
import matplotlib.pyplot as plt
from scipy.stats import entropy, mode
from scipy.signal import hann, fftconvolve
from scipy.fftpack import fft

EPS = 0.0000008

def adaptive_threshold(data):
    sz = len(data)
    if sz == 0:
        return data

    smoothed = np.copy(data)
    p_pre = 8
    p_post = 7

    for i in range(sz):
        first = max(0, i - p_pre)
        last = min(sz - 1, i + p_post)
        smoothed[i] = np.mean(data[first:last + 1])

    for i in range(sz):
        data[i] -= smoothed[i]
        if data[i] < 0.0:
            data[i] = 0.0
    return data


def estimate_time_signature(tempo, beat_frames, sr, hop_length):
    """
    Estimates the time signature based on the beat times.

    Args:
        beat_times (np.ndarray): The beat times in seconds.
        sr (int): The sampling rate of the audio signal.
        hop_length (int): The hop length used to compute the beat times.

    Returns:
        int: The estimated time signature.
    """
    # Compute the inter-beat intervals (IBIs) in frames
    ibis_frames = np.diff(beat_frames)

    # Quantize the IBIs to the nearest beat period
    beat_period_frames = int(round(60 * sr / tempo / hop_length))
    quantized_ibis = np.round(ibis_frames / beat_period_frames) * beat_period_frames

    # Calculate the mode of the quantized IBIs
    time_signature = int(mode(quantized_ibis / beat_period_frames)[0][0])

    return time_signature

def find_downbeats(audio, beats, hop_length=512, factor=1, beatframesize=2048, bpb=4):
    if len(audio) == 0:
        return []

    oldspec = np.zeros(beatframesize // 2)
    beatsd = []

    for i in range(len(beats) - 1):
        beatstart = int((beats[i] * hop_length) // factor)
        beatend = int((beats[i + 1] * hop_length) // factor)
        if beatend >= len(audio):
            beatend = len(audio) - 1
        if beatend < beatstart:
            beatend = beatstart

        beatlen = beatend - beatstart
        beatframe = audio[beatstart:beatend] * hann(beatlen)

        # Calculate the padding amount and apply the padding
        pad_amount = max(0, beatframesize - len(beatframe))
        beatframe = np.pad(beatframe, (0, pad_amount), mode='constant')

        fft_out = fft(beatframe)
        newspec = np.abs(fft_out[:beatframesize // 2])

        newspec = adaptive_threshold(newspec)

        if i > 0:
            beatsd.append(measure_spec_diff(oldspec, newspec))

        oldspec = np.copy(newspec)

    timesig = bpb if bpb != 0 else 4
    dbcand = np.zeros(timesig)

    for beat in range(timesig):
        count = 0
        for example in range(beat - 1, len(beatsd), timesig):
            if example < 0:
                continue
            dbcand[beat] += (beatsd[example]) / timesig
            count += 1
        if count > 0:
            dbcand[beat] /= count

    dbind = np.argmax(dbcand)
    downbeats = [i for i in range(dbind, len(beats), timesig)]

    return downbeats

def measure_spec_diff(oldspec, newspec):
    # JENSEN-SHANNON DIVERGENCE BETWEEN SPECTRAL FRAMES
    SPECSIZE = 512  # ONLY LOOK AT FIRST 512 SAMPLES OF SPECTRUM.
    if SPECSIZE > len(oldspec) // 4:
        SPECSIZE = len(oldspec) // 4
    
    SD = 0.0

    # Add EPS to avoid taking log of 0
    newspec = newspec[:SPECSIZE] + EPS
    oldspec = oldspec[:SPECSIZE] + EPS
    
    sumnew = np.sum(newspec)
    sumold = np.sum(oldspec)
    
    # Normalize the spectra
    newspec /= sumnew
    oldspec /= sumold
    
    # Replace any remaining zeros with ones (after normalization, this shouldn't happen, but just in case)
    newspec = np.where(newspec == 0, 1.0, newspec)
    oldspec = np.where(oldspec == 0, 1.0, oldspec)
    
    # Jensen-Shannon calculation
    m = 0.5 * (oldspec + newspec)
    SD = np.sum(-m * np.log(m) + 0.5 * (oldspec * np.log(oldspec)) + 0.5 * (newspec * np.log(newspec)))
    
    return SD

def load_pickle(file_path):
    with open(file_path, 'rb') as f:
        return pickle.load(f)

In [7]:
X_test = load_pickle('../data/pkl/test_data.pkl')
random.seed(31)
random_song_id = int(random.choice(list(X_test.keys())))
audio_file = f'../data/audio_files/processed/{random_song_id}.mp3'
df = pd.read_csv('../data/dataframes/clean_labeled.csv')
data = df.loc[df['SongID'] == random_song_id]

# Load audio file
y, sr = librosa.load(audio_file, sr=None)
y = librosa.resample(y=y, orig_sr=sr, target_sr=44100)
sr = 44100
hop_length = 128
y_harm, y_perc = librosa.effects.hpss(y)
onset_env = librosa.onset.onset_strength(y=y_perc, sr=sr, hop_length=hop_length)

# Extract audio features
duration = librosa.get_duration(y=y, sr=sr)

tempo, beats = librosa.beat.beat_track(onset_envelope=onset_env, sr=sr, hop_length=hop_length)
time_signature = estimate_time_signature(tempo, beats, sr, hop_length)

  time_signature = int(mode(quantized_ibis / beat_period_frames)[0][0])


In [None]:
# Compute beat frames
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr)

# Compute mel spectrogram for each beat frame
S = librosa.feature.melspectrogram(y=y, sr=sr)

# Define the cost function for downbeats (to be implemented)
def cost_function(beat_frame):
    # This function should return the cost of a beat being a downbeat
    pass

# Initialize the dynamic programming matrices
n_beats = len(beat_frames)
cost_matrix = [[float('inf')] * n_beats for _ in range(n_beats)]
back_pointer = [[None] * n_beats for _ in range(n_beats)]

# Base case: the first beat has a cost based on the cost function
cost_matrix[0][0] = cost_function(beat_frames[0])

# Fill in the dynamic programming matrix
for i in range(1, n_beats):
    for j in range(i):
        # Calculate the cost of the current beat being a downbeat
        cost = cost_function(beat_frames[i])
        # Update the cost matrix and back-pointer if this path is better
        if cost_matrix[i-1][j] + cost < cost_matrix[i][i]:
            cost_matrix[i][i] = cost_matrix[i-1][j] + cost
            back_pointer[i][i] = j
        # Update the cost of continuing without a downbeat
        cost_matrix[i][j] = cost_matrix[i-1][j]

# Find the end of the most likely downbeat sequence
min_cost = min(cost_matrix[-1])
end_index = cost_matrix[-1].index(min_cost)

# Backtrack to find the most likely sequence of downbeats
downbeats = []
current_index = end_index
while current_index is not None:
    downbeats.append(current_index)
    current_index = back_pointer[current_index][current_index]

# Reverse the downbeats list since we backtracked
downbeats = downbeats[::-1]

# Convert beat frame indices to time stamps
downbeat_times = [beat_frames[i] for i in downbeats]

In [None]:
def downbeat_track_dp(local_downbeat_score, period, tightness):
    backlink = np.zeros_like(local_downbeat_score, dtype=int)
    cumscore = np.zeros_like(local_downbeat_score)

    # Calculate the search range to consider for previous downbeats, which could be longer than for beats
    window = np.arange(-2 * period, -np.round(period / 2) + 1, dtype=int)

    # Calculate the tightness weighting
    if tightness <= 0:
        raise ParameterError("tightness must be strictly positive")
    txwt = -tightness * (np.log(-window / period) ** 2)

    first_downbeat = True
    for i, score_i in enumerate(local_downbeat_score):
        z_pad = np.maximum(0, min(-window[0], len(window)))
        candidates = txwt.copy()
        candidates[z_pad:] = candidates[z_pad:] + cumscore[window[z_pad:]]

        best_predecessor = np.argmax(candidates)
        cumscore[i] = score_i + candidates[best_predecessor]

        if first_downbeat and score_i < 0.01 * local_downbeat_score.max():
            backlink[i] = -1
        else:
            backlink[i] = window[best_predecessor]
            first_downbeat = False

        window = window + 1

    return backlink, cumscore

def downbeat_tracker(
    onset_envelope: np.ndarray, bpm: float, fft_res: float, tightness: float, trim: bool
) -> np.ndarray:
    """Tracks downbeats in an onset strength envelope.

    Parameters
    ----------
    onset_envelope : np.ndarray [shape=(n,)]
        onset strength envelope
    bpm : float [scalar]
        tempo estimate, to establish the beat period
    fft_res : float [scalar]
        resolution of the FFT (sr / hop_length)
    tightness : float [scalar]
        how closely do we adhere to bpm?
    trim : bool [scalar]
        trim leading/trailing downbeats with weak onsets?

    Returns
    -------
    downbeats : np.ndarray [shape=(n,)]
        frame numbers of downbeat events
    """
    if bpm <= 0:
        raise ParameterError("bpm must be strictly positive")

    # convert bpm to a sample period for searching
    # assuming 4 beats per measure, downbeat period is 4 times the beat period
    period = round(60.0 * fft_res / bpm) * 4

    # localscore is a version of AGC'd onset envelope emphasizing downbeat characteristics
    localscore = downbeat_local_score(onset_envelope, period)

    # run the DP
    backlink, cumscore = downbeat_track_dp(localscore, period, tightness)

    # get the position of the last downbeat
    downbeats = [last_beat(cumscore)]  # This function might need to be adapted for downbeats

    # Reconstruct the downbeat path from backlinks
    while backlink[downbeats[-1]] >= 0:
        downbeats.append(backlink[downbeats[-1]])

    # Put the downbeats in ascending order and convert into an array of frame numbers
    downbeats = np.array(downbeats[::-1], dtype=int)

    # Discard spurious trailing downbeats
    downbeats = trim_beats(localscore, downbeats, trim)  # This function might need to be adapted for downbeats

    return downbeats