In [193]:
#imports
import tomllib
import numpy as np
from pprint import pprint
import librosa
import requests
from pydub import AudioSegment
from io import BytesIO
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import matplotlib.pyplot as plt

In [194]:
with open("cfg.toml", "rb") as cfg:
    keys = tomllib.load(cfg)["spotify"]
    c_id = keys["client_id"]
    c_secret = keys["client_secret"]
    auth_manager = SpotifyClientCredentials(client_id=c_id, client_secret=c_secret)
    
sp = spotipy.Spotify(auth_manager=auth_manager)

In [195]:
"""taken from matthew baleanu and Mohamad-Hassan Bahsoun
16khz downsample (default 22.05khz, spotify audio is 44.1khz.)"""
# def downsampleAudio(audio_file_path, target_sample_rate=16000):
#     #load the audio and downsample it. 
#     #librosa.load converts the audio file input (.wav, preferrably) into a time series. 
#     signal, sampling_rate = librosa.load(audio_file_path, sr=target_sample_rate, mono=True)
#     #normalize amplitude of the audio signal
#     signal = signal / np.max(np.abs(signal))
#     return signal, sampling_rate

# #draft preprocessor module. Downsamples and STFTs the audio. 
# def processAudio(audio_file_path):
#     signal, sampling_rate = downsampleAudio(audio_file_path)
#     #stft signal for feature computation
#     stft_signal = librosa.stft(signal, window='hann')
#     return signal, stft_signal

"""default sampling rate set to 16khz as per @bahsoun"""
def processAudio(audio_file_path, target_sample_rate=16000):
    """load the audio and sample it at the target rate. 
    librosa.load converts the audio file into a time series at the desired sampling rate """
    signal, sampling_rate = librosa.load(audio_file_path, sr=target_sample_rate, mono=True)
    """normalize the amplitude of the signal."""
    signal = signal / np.max(np.abs(signal))
    """stft signal for feature computation. probably will be moved. """
    stft_signal = librosa.stft(signal, window='hann')
    return signal, stft_signal, sampling_rate

#test function for visualization. Not necessary for any applications, but useful for sanity checking results, I think. 
def plotSpectrogram(stft_signal, target_sampling_rate):
    #adapted from @baleanu
    plt.figure(figsize=(12, 12))
    #compute the spectrogram power, then to dB
    power_spectrogram = np.abs(stft_signal)**2
    spectrogram_db = librosa.amplitude_to_db(power_spectrogram, ref=np.max)

    #display spectrogram
    librosa.display.specshow(spectrogram_db, sr = target_sampling_rate, x_axis='time', y_axis='linear', cmap='viridis')
    plt.colorbar(label='Power (dB)')
    plt.title('Power Spectrogram')
    plt.xlabel('Time (s)')
    plt.ylabel('Frequency (Hz)')
    plt.show()
    print(stft_signal.shape)

"""as suggested by MVM, the signal we divide up the signal into bins and compute a window size in order to get
    variance for our feature, this way we can determine how relevant they would be for classification/recommendation."""
def divideSignal(signal, bpm, sampling_rate, beats_per_win = 4):
    """process is as such:
    1. get global tempo (bpm)
    2. convert that to beat segments
    3. use it to divide up the song."""
    beat_duration = 60/bpm
    song_length_seconds = len(signal)/sampling_rate
    beat_count = song_length_seconds/beat_duration
    
    """once beat count is computed, compute the window size as a fixed multiple of the beat count
    Round up division on beat_count/beats_per_win to compute the window count. 
    To calculate the window size: the length of the signal is seconds * samples, as that is an audio time series. The window size would 
    therefore be the sample length of len(signal)/window_count rounded up, in order to contain all samples."""
    window_count = int(np.ceil(beat_count/beats_per_win))
    window_size = int(np.ceil(len(signal)/window_count))
    
    """to divide the signal we pad the original signal with zeroes at the end until it is of the proper length
    for consistency and then reshape it."""
    
    divided_signal = np.pad(signal,
                            (0, window_count*window_size-len(signal)),
                            mode="constant",
                            constant_values=0)
    
    """now we must reshape the signal."""
    divided_signal = divided_signal.reshape(window_count, window_size)
    return divided_signal, window_size, window_count

"""need to STFT invidual pieces"""
def divideSTFT(divided_signal):
    """sample STFT calculation is necessary in order to perform preallocation for efficiency"""
    x, y = librosa.stft(divided_signal[0]).shape
    divided_stft_signal = np.zeros((divided_signal.shape[0], x, y), dtype=np.complex128)
    divided_stft_magnitudes = np.zeros((divided_signal.shape[0], x, y))
    for i in range(divided_signal.shape[0]): 
        stft_slice = librosa.stft(divided_signal[i])
        divided_stft_signal[i] = stft_slice
        divided_stft_magnitudes[i] = np.abs(stft_slice)

    return divided_stft_signal, divided_stft_magnitudes

In [46]:
#from Mohamad-Hassan Bahsoun
def computeRMS(signal):
    #RMS is the square root of the average of the squared signal. 
    squared_signal = np.square(signal)
    mean_squared = np.mean(squared_signal)
    rms = np.sqrt(mean_squared)
    #convert RMS to decibels
    rms = 20*np.log10(rms)
    return rms

def computeDynamicRange(signal, rms):
    #Dynamic range here is defined as peak - RMS, as peak - min would yield the max. 
    #decibel conversion in order to avoid computing division, as that ends up being slower. 
    max_level = 20 *np.log10(np.max(np.abs(signal)))
    dynamic_range = max_level - rms
    return dynamic_range

def computeBPM(signal, target_sample_rate):
    """temporary BPM calculation is simply done as librosa.feature.bpm. this is because @baleanu code has not been updated to same branch, 
    and testing of the window division based on BPM relies on this module
    this is to be replaced with the real computeBPM function, already implementedby @baleanu
    real function signature would actually use a stft_signal"""
    bpm = librosa.feature.tempo(y=signal, sr=target_sample_rate)
    return bpm[0]

In [206]:
"""divided signal RMS compute"""
def computeRMS(divided_signal):
    """computes rms over each window, return vector of rms values and average rms"""
    rms = np.zeros((divided_signal.shape[0], 1))
    rms_mean = 0
    for i in range(divided_signal.shape[0]):
        squared_signal = np.square(divided_signal[i])
        mean_squared = np.mean(squared_signal)
        root_mean_squared = np.sqrt(mean_squared)
        "decibel conversion of the rms portion"
        root_mean_squared = 20*np.log10(root_mean_squared)
        rms_mean += root_mean_squared
        rms[i] = root_mean_squared
    """variance calculation""" 
    rms_mean /= divided_signal.shape[0]
    return rms, rms_mean

"""divided dynamic range compute"""
def computeDynamicRange(divided_signal, divided_rms):
    """computes dynamic range  of each window and then average dynamic range"""
    dynamic_range = np.zeros((divided_signal.shape[0], 1))
    dynamic_range_mean = 0
    for i in range(divided_signal.shape[0]):
        dynamic_max = 20 *np.log10(np.max(np.abs(divided_signal[i])))
        dynamic_range_slice = dynamic_max - divided_rms[i]
        dynamic_range[i] = dynamic_range_slice
        dynamic_range_mean += dynamic_range_slice
    dynamic_range_mean /= divided_signal.shape[0]
    return dynamic_range, dynamic_range_mean


"""define a helper function for getSpectralCentroid"""
def computeSpectralCentroid(stft_signal, frequencies):
    # compute the magnitude of the stft
    # x_n = np.abs(stft_signal)
    x_n = stft_signal
    f_n = frequencies[:,None]
   
    # multiply each frequency bin by the magnitude
    #Centroid = (Σₙ₌₀ᴺ⁻¹ [f(n) * x(n)]) / (Σₙ₌₀ᴺ⁻¹ x(n))
    numerator = np.sum(f_n * x_n)
    denominator = np.sum(x_n)

    #if x(n)s are zero for some reason
    epsilon = 1e-6
    denominator = np.where(denominator==0, epsilon, denominator)
    
    spec_c = numerator/denominator
    # print(f"{spec_c.shape}")
    return spec_c

""" spectral centroid code adapted from @bahsoun"""
def computeSpectralCentroidsMean(divided_stft_magnitudes, sampling_rate):
    """spectral centroic calc is: Centroid = (Σₙ₌₀ᴺ⁻¹ [f(n) * x(n)]) / (Σₙ₌₀ᴺ⁻¹ x(n))
    np.fft.rfftfreq conputes f(n), need to compute spectral centroid over window in each piece of the signal
    divided_stft_magnitudes contains the magnitudes the divided signal after each piece has been passed through STFT"""
    frequencies = np.fft.rfftfreq(2048, d=1/sampling_rate)
    spectral_centroids = np.zeros((divided_stft_magnitudes.shape[0], 1))
    # spectral_centroids = np.zeros((divided_stft_magnitudes.shape[0], computeSpectralCentroid(divided_stft_magnitudes[0], frequencies).shape[0]))
    mean_spectral_centroid = 0
    for i in range(divided_stft_magnitudes.shape[0]):
        """np.mean in order to average the spectral centroid over the window."""
        spectral_centroid_piece = computeSpectralCentroid(divided_stft_magnitudes[i], frequencies)
        spectral_centroids[i] = spectral_centroid_piece
        mean_spectral_centroid += spectral_centroid_piece
    mean_spectral_centroid /= divided_stft_magnitudes.shape[0]
    return spectral_centroids, mean_spectral_centroid

"""to compute freq range we need the spectral rolloff
this function is a getter function that computes the thresholds and also trims the input itself"""
def computeSpectralRolloffFrequency(stft_magnitude, sampling_rate, percentile):
    """compute the frequencies once again"""
    frequencies = np.fft.rfftfreq(2048, d=1/sampling_rate)
    
    pass

"""this function calls computeSpectralRolloffFrequency in order to trim each piece of the signal. We then calculate the range 
for the specific interval and average it at the end."""
def computeFrequencyRange(divided_stft_magnitudes, sampling_rate, percentile=0.05):
    """frequency range is max-min after trimming off using spectral rolloff"""
    frequency_ranges = np.zeros((divided_stft_magnitudes.shape[0],1))
    average_frequency_range = 0

    for i in range(divided_stft_magnitudes.shape[0]):
        rolloff = computeSpectralRolloffFrequency(divided_stft_magnitudes[i], sampling_rate, percentile)
        frequency_range_slice = np.max(rolloff) - np.min(rolloff)
        frequency_ranges[i] = frequency_range_slice
        average_frequency_range += frequency_range_slice
    average_frequency_range /= divided_stft_magnitudes.shape[0]
    return frequency_ranges, average_frequency_range

In [204]:
"""test"""
# input = "../audio/Ma Meilleure Ennemie.wav"
input = "../audio/The Weeknd - Out of Time.wav"
signal, stft_signal, sampling_rate = processAudio(input, target_sample_rate = 16000)
# rms = computeRMS(signal)
bpm = computeBPM(signal, sampling_rate)

beat_duration = 60/bpm
song_length_seconds = len(signal)/sampling_rate
beat_count = song_length_seconds/beat_duration

# print(f"Song BPM: {bpm}")
# print(f"Beat duration of the song in seconds: {beat_duration}")
# print(f"Length of the song: {song_length_seconds}")
# print(f"Therefore number of beats for the entire song: {beat_count}")

# beats_per_window = 4
# window_count = int(np.ceil(beat_count/beats_per_window))
# print(f"The window count is: {window_count}, using {beats_per_window} beats per window. This is calculated through beat_count/beats_per_window rounded up.")
# window_size = int(np.ceil(len(signal)/window_count))
# divided_signal_length = window_count * window_size
# print(f"window size at the input sampling rate is therefore: {window_size}")
# print(f"Therefore the entire new reconstructed audio is as such: {divided_signal_length}, which is longer than the original signal: {len(signal)}")
# signal_size_diff = divided_signal_length - len(signal)
# print(f"the difference between the divided signal length and original signal length is: {signal_size_diff} samples")
# plotSpectrogram(stft_signal, sampling_rate)
# print(f"RMS: {rms}dB, BPM: {bpm}")

divided_signal, win_size, win_count = divideSignal(signal, bpm, sampling_rate, beats_per_win=1)
print(f"The shape of the divided signal is as such: {divided_signal.shape}")
# zero_pad_check = divided_signal[divided_signal.shape[0]-1][divided_signal.shape[1]-signal_size_diff-1:divided_signal.shape[1]-1]
# print(f"Check if zero padding: {zero_pad_check}")
# signal2 = np.arange(10)
# div_signal2, _, _ = divideSignal(signal2)
# print(div_signal2[][])
div_rms, div_rms_mean = computeRMS(divided_signal)
# print(f"div_rms: {div_rms}, div_rms mean: {div_rms_mean}")
# print(div_rms.shape)

div_dr, div_dr_mean = computeDynamicRange(divided_signal, div_rms)
# print(f"div_dynamic range: {div_dr}, div_dyanmic range mean: {div_dr_mean}")
# print(div_dr.shape)


The shape of the divided signal is as such: (335, 10236)


In [None]:
div_stft, div_stft_mag = divideSTFT(divided_signal)
# fr= np.fft.rfftfreq(2048, d=1/sampling_rate)
# computeSpectralCentroid(div_stft_mag[0], fr)
# spec_c, mean_spec_c = computeSpectralCentroidsMean(div_stft_mag, sampling_rate)

# print(mean_spec_c)
# print(spec_c)

# print(div_stft_mag[0])


2006.156261709345
[[1092.18779206]
 [2217.55230642]
 [1489.76319831]
 [2305.01012033]
 [1035.34984235]
 [2171.40343814]
 [1251.55992682]
 [2284.56438217]
 [1247.96358268]
 [2318.37612439]
 [1570.81364003]
 [2258.02162922]
 [1061.88356012]
 [2237.85635454]
 [1236.16200258]
 [2429.25392819]
 [1418.95050709]
 [2358.45210274]
 [1413.64753265]
 [2217.76253862]
 [1238.70539343]
 [2129.0171635 ]
 [1190.29635005]
 [2289.42858698]
 [1373.66669586]
 [2129.03830654]
 [1509.64475408]
 [2112.66335324]
 [1486.13732186]
 [2344.73371824]
 [2072.68568119]
 [2311.92726774]
 [1606.05369024]
 [2332.45285268]
 [2121.98651375]
 [2202.95260744]
 [1616.43795429]
 [2125.69401002]
 [1455.19737617]
 [2012.5844844 ]
 [1568.74927823]
 [2587.25225706]
 [2298.2896516 ]
 [2335.84122807]
 [1443.82379984]
 [2096.3882335 ]
 [1465.58204312]
 [2224.13703735]
 [1817.97265907]
 [2153.56308911]
 [2056.49378699]
 [2310.50604328]
 [1468.03592938]
 [2253.66318662]
 [1480.47884778]
 [2016.93825025]
 [1816.64854228]
 [2181.006779