# Package Installation & Preparations

In [1]:
from google.colab import drive
drive.mount('/content/drive/')
import os
os.chdir('/content/drive/MyDrive/Data_Colab/MIR')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [2]:
!pip install mir_eval

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
import librosa
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import itertools
import soundfile as sf
import math
from math import sin, cos, radians
from itertools import repeat
import pandas as pd
import pickle
import mir_eval
import scipy.linalg
from typing import List
from scipy.stats import zscore
from scipy.signal import argrelmax
import scipy.signal as signal
import numpy as np
from dataclasses import dataclass
from prettytable import PrettyTable
from scipy.fftpack import ifft
from librosa.util import normalize
# import librosa.display

In [4]:
import warnings
warnings.filterwarnings("ignore")

In [5]:
# measures the changes in spectral content over time
def spectral_flux(data, sr, hop_size, window_size, g, mean_size, lag=1):
    x = librosa.stft(data, n_fft=window_size, hop_length=hop_size)
    
    #time value t
    t = np.arange(x.shape[1]) * hop_size / sr
    # Compute the magnitude spectrum and the spectral flux
    y = np.log(1 + g * np.abs(x))
    spflx = np.maximum(0., y[:, lag:] - y[:, :-lag]).mean(axis=0)
    # Compute the time values for the enhanced spectral flux
    t2 = (t[lag:] + t[:-lag]) / 2

    # Post-processing: Apply mean filter and enhance the spectral flux
    mean_filter = np.ones(mean_size) / mean_size
    u = signal.fftconvolve(spflx, mean_filter, 'same')
    spfx_enhance = np.maximum(0., spflx - u)
    spfx_enhance = spfx_enhance / spfx_enhance.max()
    return t2, spfx_enhance

In [6]:
def tempo_estimation(freq_scale, tempogram):
    tempo_vector = np.sum(tempogram, axis=1)
    peak_idx = argrelmax(tempo_vector)
    
    # Sort the peaks based on their magnitude (tempo_vector) in descending order
    peaks = sorted(zip(tempo_vector[peak_idx], freq_scale[peak_idx]), key=lambda x: x[0], reverse=True)
    
    # Select the two highest peaks and arrange the peaks in ascending order
    pack = peaks[:2]
    if pack[0][1] > pack[1][1]:
        pack = pack[::-1]
    
    # Compute the relative saliency
    s1 = pack[0][0] / (pack[0][0] + pack[1][0])
    return pack[0][1], pack[1][1], s1


By applying the weighted sum, the function amplifies the contributions of the harmonic components in the tempogram, emphasizing the harmonic information and potentially improving the accuracy of tempo estimation algorithms that rely on harmonic features.

In [7]:
# weighted sum of the tempogram columns to enhance the harmonic structure.
def harmonic_sum_tempogram(freq_scale, tempogram, harms, alpha):
    # Get parts of tempogram
    tpg = tempogram[:len(freq_scale) // harms, :]
    weights = alpha ** (-np.arange(1, harms))
    
    for i in range(1, tpg.shape[0]):
        stop = i * (harms + 1)
        weights_slice = weights[:, np.newaxis]
        tempogram_slice = tempogram[2 * i:stop:i]
        tpg[i] += np.sum(tempogram_slice * weights_slice, axis=0)
    
    return freq_scale[:tpg.shape[0]], tpg

In [8]:
# short term acf (autocorrelation function)
def st_acf(data, sr, window_size, hop_size):
    if hop_size:
        noverlap = window_size - hop_size
    else:
        noverlap = None
    _, t, x = signal.stft(data, sr, nperseg=window_size, noverlap=noverlap, return_onesided=False)
    acf = ifft(np.abs(x) ** 2, axis=0).real
    acf = acf[:acf.shape[0] // 2 + 1]
    lag = np.arange(acf.shape[0]) / sr
    return lag, t, acf

In [9]:
def p_score(g, t1, t2, s1): #g=reference
    T1 = 0
    T2 = 0
    if abs(g - t1) / g <= 0.08:
        T1 = 1
    if abs(g - t2) / g <= 0.08:
        T2 = 1
    return (s1 * T1) + ((1-s1) * T2)

def ALOTC_SCORE(g, t1, t2):
    T1, T2 = 0, 0
    if abs(g - t1) / g <= 0.08:
        T1 = 1
    if abs(g - t2) / g <= 0.08:
        T2 = 1
    if T1 or T2: return 1
    else: return 0

In [10]:
path = './Ballroom/BallroomData/'
genres = ['ChaCha', 'Jive', 'Quickstep', 'Rumba', 'Samba', 'Tango', 'Viennese waltz', 'Waltz']
window_size = 2048
lw_sr = 100
g = 1
mean_size = 25

# Question 1 & 2

Fourier Tempogram

In [13]:
def tempogram_fourier(nv_curve, sr, window_size=2, hop_size=128):
    stft = librosa.stft(nv_curve, n_fft=window_size, hop_length=hop_size)
    tempogram = normalize(np.abs(stft))
    freq_scale = np.arange(window_size // 2 + 1) * sr / window_size * 60
    return freq_scale, tempogram

In [14]:
for window_size in [800, 2000, 2400]:#
    print('Running with window_length', window_size)
    table = PrettyTable(["Genre", "P-score", "ALOTC"])
    ratio_list = []
    for genre in genres:
        # print('running', genre)
        score = []
        ratio = []
        dir = os.path.join(path, genre)
        files = os.listdir(dir)
        for file_name in files:
            file = os.path.join(dir, file_name)
            s, sr = librosa.load(file, sr=None)
            hop_size = sr // lw_sr
            t, spfx_enhance = spectral_flux(s, sr, hop_size, window_size=1024, g=g, mean_size=mean_size, lag=1)
            f, tpg = tempogram_fourier(spfx_enhance, lw_sr, window_size=window_size, hop_size=hop_size)#hop_size=50
            # f, tpg = harmonic_sum_tempogram(f, tpg, 4, 1.)
            T1, T2, S1 = tempo_estimation(f, tpg)

            with open(os.path.join('./Ballroom/BallroomAnnotations/ballroomGroundTruth', file_name.replace('.wav', '.bpm').split('/')[-1])) as label_file:
                truth = int(label_file.readline())
            score.append([])
            score[-1].append(p_score(truth, T1, T2, S1))
            score[-1].append(ALOTC_SCORE(truth, T1, T2))
        score = np.array(score)
        out = score.mean(axis=0).tolist()

        table_row = [genre]
        for p in out:
            table_row.append("{:.4f}".format(p))
        table.add_row(table_row)

    print(table)
    print()

Running with window_length 800
+----------------+---------+--------+
|     Genre      | P-score | ALOTC  |
+----------------+---------+--------+
|     ChaCha     |  0.0000 | 0.0000 |
|      Jive      |  0.2982 | 0.6000 |
|   Quickstep    |  0.5202 | 0.9268 |
|     Rumba      |  0.0000 | 0.0000 |
|     Samba      |  0.0037 | 0.0116 |
|     Tango      |  0.0947 | 0.2442 |
| Viennese waltz |  0.4832 | 0.9538 |
|     Waltz      |  0.0224 | 0.0455 |
+----------------+---------+--------+

Running with window_length 2000
+----------------+---------+--------+
|     Genre      | P-score | ALOTC  |
+----------------+---------+--------+
|     ChaCha     |  0.0000 | 0.0000 |
|      Jive      |  0.2959 | 0.5833 |
|   Quickstep    |  0.5485 | 0.9390 |
|     Rumba      |  0.0000 | 0.0000 |
|     Samba      |  0.0035 | 0.0116 |
|     Tango      |  0.1093 | 0.2791 |
| Viennese waltz |  0.4918 | 0.9538 |
|     Waltz      |  0.0313 | 0.0636 |
+----------------+---------+--------+

Running with window_len

Autocorrelation tempogram

In [28]:
def tempogram_auto(nv_curve, sr, window_size=512, hop_size=None):
    lag, _, tempogram = st_acf(nv_curve, sr, window_size, hop_size=hop_size)
    tempogram = normalize(tempogram)
    freq_scale = 60 / lag[1:]
    freq_scale = np.concatenate(([0], freq_scale))
    return freq_scale, tempogram

In [15]:
for window_size in [800, 2000, 2400]:#
    print('Running with window_length', window_size)
    table = PrettyTable(["Genre", "P-score", "ALOTC"])
    ratio_list = []
    for genre in genres:
        score = []
        ratio = []
        dir = os.path.join(path, genre)
        files = os.listdir(dir)
        for file_name in files:
            data, sr = librosa.load(os.path.join(dir, file_name), sr=None)
            hop_size = sr // lw_sr
            t, spfx_enhance = spectral_flux(data, sr, hop_size, window_size, g, mean_size, lag=1)
            f, tpg = tempogram_auto(spfx_enhance, lw_sr)
            T1, T2, S1 = tempo_estimation(f, tpg)

            with open(os.path.join('./Ballroom/BallroomAnnotations/ballroomGroundTruth', file_name.replace('.wav', '.bpm').split('/')[-1])) as label_file:
                truth = int(label_file.readline())
            score.append([])
            score[-1].append(p_score(truth, T1, T2, S1))
            score[-1].append(ALOTC_SCORE(truth, T1, T2))
        score = np.array(score)
        out = score.mean(axis=0).tolist()

        table_row = [genre]
        for p in out:
            table_row.append("{:.4f}".format(p))
        table.add_row(table_row)
    print(table)
    print()

Running with window_length 800
+----------------+---------+--------+
|     Genre      | P-score | ALOTC  |
+----------------+---------+--------+
|     ChaCha     |  0.5056 | 0.9820 |
|      Jive      |  0.4533 | 0.9333 |
|   Quickstep    |  0.4480 | 0.8780 |
|     Rumba      |  0.4429 | 0.8980 |
|     Samba      |  0.3658 | 0.7209 |
|     Tango      |  0.5180 | 0.9651 |
| Viennese waltz |  0.5182 | 0.9231 |
|     Waltz      |  0.2398 | 0.4455 |
+----------------+---------+--------+

Running with window_length 2000
+----------------+---------+--------+
|     Genre      | P-score | ALOTC  |
+----------------+---------+--------+
|     ChaCha     |  0.5136 | 0.9910 |
|      Jive      |  0.4523 | 0.9333 |
|   Quickstep    |  0.4181 | 0.8537 |
|     Rumba      |  0.4432 | 0.8980 |
|     Samba      |  0.3701 | 0.7326 |
|     Tango      |  0.5170 | 0.9651 |
| Viennese waltz |  0.4795 | 0.8923 |
|     Waltz      |  0.3402 | 0.6545 |
+----------------+---------+--------+

Running with window_len

# Question 3: 

In [33]:
def genTempogram(nv_curve, sr, wsize_f, wsize_t, hop_f, hop_t=None, harms=4, alpha=1.):
    tpg_fourier = np.abs(librosa.stft(nv_curve, n_fft=wsize_f, hop_length=hop_f))
    freq_fourier = np.arange(wsize_f // 2 + 1) * sr / wsize_f * 60
    freq_fourier, tpg_fourier = harmonic_sum_tempogram(freq_fourier, tpg_fourier, harms=harms, alpha=alpha)
    

    lag, _, tpg_auto = st_acf(nv_curve, sr, wsize_t, hop_size=hop_t)
    freq_auto = 60 / lag[1:]
    freq_auto = np.concatenate(([0], freq_auto))

    tpg_auto = normalize(tpg_auto)
    tpg_fourier = normalize(tpg_fourier)
    tpg_fourier = tpg_fourier.mean(axis=1)
    tpg_auto = tpg_auto.mean(axis=1)

    # Pool the frequency scales
    pool_scale = (freq_fourier[1:] + freq_fourier[:-1]) / 2

    # Transform the autocorrelation tempogram to match the pooled scale
    transformed_tpg_auto = np.zeros(len(pool_scale) - 1)
    for i in range(len(pool_scale) - 1):
        f1, f2 = pool_scale[i], pool_scale[i + 1]
        p_idx = (freq_auto > f1) & (freq_auto < f2)
        p_value = freq_auto[p_idx]
        if len(p_value) > 0:
            transformed_tpg_auto[i] = np.max(tpg_auto[p_idx])

    #combine fourier and autocorrelation
    cfp = tpg_fourier[1:-1] * transformed_tpg_auto
    return freq_auto[1:-1], cfp[:, None]

In [34]:
window_size = 1024
lw_sr = 100
table = PrettyTable(["Genre", "P-score", "ALOTC"])
ratio_list = []
for genre in genres:
    score = []
    ratio = []
    dir = os.path.join(path, genre)
    files = os.listdir(dir)
    for file_name in files:
        data, sr = librosa.load(os.path.join(dir, file_name), sr=None)
        hop_size = sr // lw_sr
        t, spfx_enhance = spectral_flux(data, sr, hop_size, window_size, g, mean_size, lag=1)
        f, tpg = genTempogram(spfx_enhance, lw_sr, wsize_f=2000, wsize_t=512, hop_f=50)
        T1, T2, S1 = tempo_estimation(f, tpg)
        with open(os.path.join('./Ballroom/BallroomAnnotations/ballroomGroundTruth', 
                               file_name.replace('.wav', '.bpm').split('/')[-1])) as label_file:
            truth = int(label_file.readline())
        score.append([])
        score[-1].append(p_score(truth, T1, T2, S1))
        score[-1].append(ALOTC_SCORE(truth, T1, T2))
    score = np.array(score)
    out = score.mean(axis=0).tolist()

    table_row = [genre]
    for p in out:
        table_row.append("{:.4f}".format(p))
    table.add_row(table_row)
print(table)

+----------------+---------+--------+
|     Genre      | P-score | ALOTC  |
+----------------+---------+--------+
|     ChaCha     |  0.0112 | 0.0180 |
|      Jive      |  0.0387 | 0.0667 |
|   Quickstep    |  0.0000 | 0.0000 |
|     Rumba      |  0.0783 | 0.1633 |
|     Samba      |  0.0223 | 0.0349 |
|     Tango      |  0.1228 | 0.2093 |
| Viennese waltz |  0.0053 | 0.0154 |
|     Waltz      |  0.0046 | 0.0091 |
+----------------+---------+--------+


# Question 4: Beat tracking

In [5]:
def evaluate(label_seq, pred_seq):
    tolerance = 0.07

    # Initialize arrays to store positive predictions (p_pool) and positive labels (r_pool)
    p_pool = np.zeros(pred_seq.shape)
    r_pool = np.zeros(label_seq.shape)

    for i in range(len(label_seq)):
        # Find predicted beats within the tolerance window around the current label
        p_idx = np.where((pred_seq > label_seq[i] - tolerance) & (pred_seq < label_seq[i] + tolerance))[0]
        p_beats = pred_seq[p_idx]

        # Determine the best predicted beat if there are multiple within the tolerance window
        if len(p_beats) > 1:
            dist = np.abs(p_beats - label_seq[i])
            best_idx = p_idx[np.argmax(dist)]
        # Use the single predicted beat if there's only one within the tolerance window
        elif len(p_beats) == 1:
            best_idx = p_idx[0]
        else: continue

        # Check if the best predicted beat has already been assigned to a label
        if p_pool[best_idx] > 0: continue
        else:
            # Assign the best predicted beat as a true positive and mark the label as a positive
            p_pool[best_idx] = 1
            r_pool[i] = 1
            
    # Count the true positives, false positives, and false negatives
    tp = np.count_nonzero(p_pool)
    fp = len(p_pool) - tp
    fn = len(r_pool) - tp
    return tp, fp, fn

In [6]:
path = './Ballroom/BallroomData/'
genres = ['ChaCha', 'Jive', 'Quickstep', 'Rumba', 'Samba', 'Tango', 'Viennese waltz', 'Waltz']

In [7]:
table = PrettyTable(["Genre", "Precision", "Recall", "F-scores"])

for genre in genres:
    score = []
    dir = os.path.join(path, genre)
    files = os.listdir(dir)
    for file_name in files:
        data, sr = librosa.load(os.path.join(dir, file_name), sr=None)
        tempo, beats = librosa.beat.beat_track(y=data,sr=sr)
        timestamps = np.round(librosa.frames_to_time(beats, sr=sr), decimals=3)
        label = np.loadtxt(os.path.join('./Ballroom/BallroomAnnotations-master', file_name.replace('.wav', '.beats').split('/')[-1]))
        score.append(evaluate(label[:, 0], timestamps))

    score = np.array(score).sum(axis=0)
    p = score[0] / (score[0] + score[1])
    r = score[0] / (score[0] + score[2])
    f = 2 * p * r / (p + r)
    
    table_row = [genre]
    for sc in [p,r,f]:
        table_row.append("{:.4f}".format(sc))
    table.add_row(table_row)

print(table)

+----------------+-----------+--------+----------+
|     Genre      | Precision | Recall | F-scores |
+----------------+-----------+--------+----------+
|     ChaCha     |   0.8991  | 0.8886 |  0.8938  |
|      Jive      |   0.8302  | 0.5393 |  0.6538  |
|   Quickstep    |   0.8421  | 0.4319 |  0.5710  |
|     Rumba      |   0.7554  | 0.7979 |  0.7761  |
|     Samba      |   0.5492  | 0.6121 |  0.5790  |
|     Tango      |   0.8404  | 0.7836 |  0.8110  |
| Viennese waltz |   0.8857  | 0.6747 |  0.7659  |
|     Waltz      |   0.5308  | 0.6754 |  0.5944  |
+----------------+-----------+--------+----------+
