In [None]:
# Iterate the feature_extract function according to the "tracks_with_genre_medium.csv"

In [2]:
import librosa as lb
import csv
import numpy as np
import pandas as pd
import os
import essentia.standard as es

In [4]:
# You should have mp3 audio files the following path or change the path according to the location."
AUDIO_FILE_HOME='../datasets/fma_medium_flatten/'

# The location for output files
FEATURE_OUTPUT_HOME='raw/features'

# The location for meta data file for audio
META_FILE = 'raw/meta/tracks_with_genre_medium.csv'

In [3]:
def save_to_csv(data, columns, output_filename):
    df = pd.DataFrame([data], columns=columns)
    df.to_csv(f"{FEATURE_OUTPUT_HOME}/{output_filename}", mode='a', header=False, index=False)

def extract_feature_stats(y, sr, feature_func, *args, **kwargs):
    feature_values = feature_func(y=y, sr=sr, *args, **kwargs)
    mean = np.mean(feature_values, axis=1)
    var = np.var(feature_values, axis=1)
    return mean, var   # Returns mean and variance

def extract_zero_crossings(y, sr, filename, output_filename):
    zero_crossings = np.sum(lb.zero_crossings(y))
    save_to_csv([filename, zero_crossings], ['track_id', 'zero_crossings'], output_filename)

def extract_tempo(y, sr, filename, output_filename):
    onset_env = lb.onset.onset_strength(y=y, sr=sr)
    tempo, _ = lb.beat.beat_track(onset_envelope=onset_env, sr=sr)
    # Since tempo is a single value, variance is not applicable. We save tempo as is.
    save_to_csv([filename, tempo], ['track_id', 'tempo'], output_filename)

def extract_spectral_centroid(y, sr, filename, output_filename):
    spectral_centroid = lb.feature.spectral_centroid(y=y, sr=sr)
    sc_mean, sc_var = np.mean(spectral_centroid), np.var(spectral_centroid)
    save_to_csv([filename, sc_mean, sc_var], ['track_id', 'spectral_centroid_mean', 'spectral_centroid_var'], output_filename)

def extract_spectral_rolloff(y, sr, filename, output_filename):
    spectral_rolloff = lb.feature.spectral_rolloff(y=y, sr=sr)
    sr_mean, sr_var = np.mean(spectral_rolloff), np.var(spectral_rolloff)
    save_to_csv([filename, sr_mean, sr_var], ['track_id', 'spectral_rolloff_mean', 'spectral_rolloff_var'], output_filename)

def extract_chroma_stft(y, sr, filename, output_filename):
    chroma_mean, chroma_var = extract_feature_stats(y, sr, lb.feature.chroma_stft)
    data = [filename] + list(chroma_mean) + list(chroma_var)
    columns = ['track_id'] + [f'chroma_stft_{i}_mean' for i in range(12)] + [f'chroma_stft_{i}_var' for i in range(12)]
    save_to_csv(data, columns, output_filename)

def extract_mfccs(y, sr, filename, output_filename):
    mfccs_mean, mfccs_var = extract_feature_stats(y, sr, lb.feature.mfcc, n_mfcc=20)
    data = [filename] + list(mfccs_mean) + list(mfccs_var)
    columns = ['track_id'] + [f'MFCC_{i}_mean' for i in range(1, 21)] + [f'MFCC_{i}_var' for i in range(1, 21)]
    save_to_csv(data, columns, output_filename)

def extract_harmony_percussive(y, sr, filename, output_filename):
    y_harmonic, y_percussive = lb.effects.hpss(y)
    rms_harmonic_mean = np.mean(lb.feature.rms(y=y_harmonic))
    rms_harmonic_var = np.var(lb.feature.rms(y=y_harmonic))
    rms_percussive_mean = np.mean(lb.feature.rms(y=y_percussive))
    rms_percussive_var = np.var(lb.feature.rms(y=y_percussive))
    save_to_csv([filename, rms_harmonic_mean, rms_harmonic_var, rms_percussive_mean, rms_percussive_var], 
                ['track_id', 'rms_harmonic_mean', 'rms_harmonic_var', 'rms_percussive_mean', 'rms_percussive_var'], output_filename)
    
def extract_key_scale(filename, ouput_filename):  
    track_id = filename[:-4]
     
    loader = es.MonoLoader(filename=f"{AUDIO_FILE_HOME}{filename}")
    audio = loader()

    # Extract the key and scale (mode)
    key_extractor = es.KeyExtractor()
    key, scale, strength = key_extractor(audio)

    # print(f"Key: {key}, Scale: {scale}, Strength: {strength}")
    save_to_csv([track_id, key, scale, strength],['track_id', 'key', 'scale', 'strength'], ouput_filename)
    
# Add more feature extraction functions if necessary, following the pattern shown above.

In [12]:
# Create CSV headers
headers = {
    'zero_crossings_m.csv': ['track_id', 'zero_crossings'],
    'tempo_m.csv': ['track_id', 'tempo'],
    'spectral_centroid_m.csv': ['track_id', 'spectral_centroid_mean', 'spectral_centroid_var'],
    'spectral_rolloff_m.csv': ['track_id', 'spectral_rolloff_mean', 'spectral_rolloff_var'],
    'chroma_stft_m.csv': ['track_id'] + [f'chroma_stft_{i}_mean' for i in range(12)] + [f'chroma_stft_{i}_var' for i in range(12)],
    'mfccs_m.csv': ['track_id'] + [f'MFCC_{i}_mean' for i in range(1, 21)] + [f'MFCC_{i}_var' for i in range(1, 21)],
    'hpss_m.csv': ['track_id', 'rms_harmonic_mean', 'rms_harmonic_var', 'rms_percussive_mean', 'rms_percussive_var'],
    'key_scale_m.csv': ['track_id', 'key', 'scale', 'strength']
}

for key, value in headers.items():
    pd.DataFrame(columns=value).to_csv(f"{FEATURE_OUTPUT_HOME}/{key}", index=False)

In [5]:
# to operate seperately 
def split_list_by_three(lst):
    # Determine the length of each chunk
    chunk_size = len(lst) // 3
    remainder = len(lst) % 3

    # Initialize the indices for slicing
    first_cut = chunk_size + (1 if remainder > 0 else 0)
    second_cut = first_cut + chunk_size + (1 if remainder > 1 else 0)

    # Split the list
    first_part = lst[0:first_cut]
    second_part = lst[first_cut:second_cut]
    third_part = lst[second_cut:]

    return first_part, second_part, third_part

In [8]:
file_list = sorted(os.listdir(AUDIO_FILE_HOME))
# to operate seperately
# first, second, third = split_list_by_three(file_list)

In [9]:
len(file_list)

25000

In [None]:

# This takes a lot of time. Can skip and use the already exising files.
print("total number: ", len(file_list))
for index, filename in enumerate(file_list):
    track_id = filename[:-4]
    # print(f"track_id: ", track_id)
    # print(f"filename: ", filename)
    if index % 100 == 0:
        print('complete: {index} / ', filename)
    try:
        y, sr = lb.load(f"{AUDIO_FILE_HOME}{filename}") 
        extract_zero_crossings(y, sr, track_id, 'zero_crossings_m.csv')
        extract_tempo(y, sr, track_id, 'tempo_m.csv')
        extract_spectral_centroid(y, sr, track_id, 'spectral_centroid_m.csv')
        extract_spectral_rolloff(y, sr, track_id, 'spectral_rolloff_m.csv')
        extract_chroma_stft(y, sr, track_id, 'chroma_stft_m.csv')
        extract_mfccs(y, sr, track_id, 'mfccs_m.csv')  
        extract_harmony_percussive(y, sr, track_id, 'hpss_m.csv') 
        extract_key_scale(filename, 'key_scale_m.csv')
    except Exception as e:
        print(index)
        print(e)
        

In [15]:
## merge features into a single file

file_names = ["tempo_m", "hpss_m", "spectral_centroid_m", "spectral_rolloff_m", "zero_crossings_m", "chroma_stft_m", "mfccs_m", 'key_scale_m']

# Using a list comprehension to read all dataframes into a list
dfs = [pd.read_csv(f"{FEATURE_OUTPUT_HOME}/{file_name}.csv") for file_name in file_names]

# Refactoring the merging process to be more concise
merged_df = dfs[0]
for df in dfs[1:]:
    merged_df = merged_df.merge(df, on='track_id', how='outer')
    

In [16]:
# save the feature information
merged_df.to_csv(f"{FEATURE_OUTPUT_HOME}/all_features_medium_with_var.csv")