In [None]:
import os

import IPython.display as ipd
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn as skl
import sklearn.utils, sklearn.preprocessing, sklearn.decomposition, sklearn.svm
import librosa
import librosa.display
import ast
from scipy import stats

Add a helper function to load the csv files : taken from the FMA repo from where I got the dataset

In [3]:
def load(filepath):

    filename = os.path.basename(filepath)

    if 'features' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'echonest' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'genres' in filename:
        return pd.read_csv(filepath, index_col=0)

    if 'tracks' in filename:
        tracks = pd.read_csv(filepath, index_col=0, header=[0, 1])

        COLUMNS = [('track', 'tags'), ('album', 'tags'), ('artist', 'tags'),
                   ('track', 'genres'), ('track', 'genres_all')]
        for column in COLUMNS:
            tracks[column] = tracks[column].map(ast.literal_eval)

        COLUMNS = [('track', 'date_created'), ('track', 'date_recorded'),
                   ('album', 'date_created'), ('album', 'date_released'),
                   ('artist', 'date_created'), ('artist', 'active_year_begin'),
                   ('artist', 'active_year_end')]
        for column in COLUMNS:
            tracks[column] = pd.to_datetime(tracks[column])

        SUBSETS = ('small', 'medium', 'large')
        try:
            tracks['set', 'subset'] = tracks['set', 'subset'].astype(
                    'category', categories=SUBSETS, ordered=True)
        except (ValueError, TypeError):
            # the categories and ordered arguments were removed in pandas 0.25
            tracks['set', 'subset'] = tracks['set', 'subset'].astype(
                     pd.CategoricalDtype(categories=SUBSETS, ordered=True))

        COLUMNS = [('track', 'genre_top'), ('track', 'license'),
                   ('album', 'type'), ('album', 'information'),
                   ('artist', 'bio')]
        for column in COLUMNS:
            tracks[column] = tracks[column].astype('category')

        return tracks

Load metadata and features

In [None]:
tracks = load('fma_metadata/tracks.csv')
genres = load('fma_metadata/genres.csv')
features = load('fma_metadata/features.csv')

Extract only the relevant metadata about the tracks in the "small" set because that is the one we will be working with : we will be extracting the track id, track genre, genre id and the set (training, test, validation) to which the sample belongs

In [5]:
small = tracks[tracks['set', 'subset'] <= 'small']
data = {
    'track_genre': small['track', 'genre_top'],
    'set': small['set', 'split']
}
dataset = pd.DataFrame(data)
# Extract the genre_id from the genre name
dataset = dataset.assign(genre_id=dataset['track_genre'].apply(lambda x: genres[genres['title'] == x].index[0]))

print(dataset.head())

# Export the dataset to a csv file
dataset.to_csv('data/metadata.csv')


         track_genre       set genre_id
track_id                               
2            Hip-Hop  training       21
5            Hip-Hop  training       21
10               Pop  training       10
140             Folk  training       17
141             Folk  training       17


We will want to extract features from the audio files, therefore we will need to be able to list all of their paths, knowing that they are distributed in different folders under "fma_small"

In [None]:
dir_path = r'.\\fma_small\\'
# list to store files name
pathlist = []
# use of os.walk to get all the files in the directory recursively
for (dir_path, dir_names, file_names) in os.walk(dir_path):
    # to avoid problems, we get the absolute path of the files (which requires a little "trick"), and we only keep the mp3 files
    pathlist.extend(os.path.abspath(os.path.join(dir_path, file)) for file in file_names if file.endswith('.mp3'))
print(pathlist)

We will then need to go through all the files, extracting the relevant features for each file, and storing them in a dataframe, associating the file with the track_id (the track_id being the name of the file stripped from all the 0's and the extension)

Need to code the feature extraction function and append all the features into a dataframe with the track_id

For the feature extraction, I will for the moment extract spectral centroids and MFCCs because they seem to be giving quite good results on audio genre classification tasks. Testing will be done on a small number of tracks to see how long it takes to extract all the features, and then I will decide if I can or not extract all features or if I have to go with the already calculated ones.

Note that by default, the extraction of the features is done frame by frame on the audio, meaning that for a single audio file, we get a matrix of results. For simplicity's sake, we will follow the approach of the FMA dataset researchers, and calculate the statistical values associated to the extracted features, namely mean, standard deviation, min, max, median, skew and kurtosis
That way we can deal with only 1 feature vector per track. Calculating all these statistical indices will allow us to still have a good depiction of the extracted features, even though we lose time dependency : we don't simply take the mean ! This approach could be discussed in the paper, but as the task is audio classification, a hollistic approach should be a good fit for us (plus we are working on a quite simple problem with only 8 classes)

In [None]:
def extract_features(file_path):
    # Load audio file
    y, sr = librosa.load(file_path)
    
    # Extract MFCC features
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20)
    
    # Extract spectral centroid
    centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
    
    return mfcc, centroid

def calculate_statistics(features):
    statistics = {
        'mean': np.mean(features, axis=1),
        'std': np.std(features, axis=1),
        'min': np.min(features, axis=1),
        'max': np.max(features, axis=1),
        'kurtosis': pd.DataFrame(features).kurtosis(axis=1),
        'skew': pd.DataFrame(features).skew(axis=1),
        'median': np.median(features, axis=1)
    }
    return statistics

def process_audio_files(file_paths):
    data = []
    for file_path in file_paths:
        try:
            # Extract features
            mfcc, centroid = extract_features(file_path)
            
            # Calculate statistics for MFCC features
            mfcc_statistics = calculate_statistics(mfcc)
            
            # Calculate statistics for spectral centroid
            centroid_statistics = calculate_statistics(centroid)
            
            # Store statistics and file id in a dictionary
            # Careful : use lstrip and not just strip because it would remove all the 0s of the files ending with 0 (like 10 would become 1)
            file_id = os.path.basename(file_path).split('.')[0].lstrip('0') # Extract track ID from file name
            entry = {'track_id': file_id}
            
            # Add MFCC statistics to entry
            for mfcc_number, mfcc_values in enumerate(mfcc):
                for stat_name, stat_value in mfcc_statistics.items():
                    entry[f'mfcc_{mfcc_number}_{stat_name}'] = stat_value[mfcc_number]
            
            # Add centroid statistics to entry
            for stat_name, stat_value in centroid_statistics.items():
                entry[f'centroid_{stat_name}'] = stat_value[0]  # Spectral centroid has only one value per track
            
            data.append(entry)

        except Exception as e:
            print(f"Error processing track {os.path.basename(file_path).split('.')[0].lstrip('0')}: {e}")
            continue
        
    return data

# List of file paths (replace with actual file paths)
file_paths = [pathlist[0], pathlist[1]]

# Process audio files and create dataframe
df = pd.DataFrame(process_audio_files(file_paths))

print(df)

Now that I have a working method to extract all the features that I want on a list of files, I can apply it to a bigger chunk of the data to see how long it takes. Once that is done, I can start working on PCA to display the data, and on machine learning models to do the classification.

In [None]:
file_paths = []
for i in range(100):
    file_paths.append(pathlist[i])

# Process audio files and create dataframe
df = pd.DataFrame(process_audio_files(file_paths))

df.to_csv('extracted_features.csv')

print(df)

Some testing show that 100 files can be processed in around 20 seconds, meaning that all 8000 files can be processed in around 30 minutes
In order to not have to redo the extraction everytime and be able to work on the data analysis part, I will export the dataframe into a CSV file so that I can just import it (and not waste 30 minutes)
I will be using a "periodic commit" to write to the CSV file periodically and not have to redo the whole calculations every time it crashes (because it seems to be crashing at some point but I don't know exactly why)

In [None]:
# Function to split list into chunks
def chunks(lst, n):
    for i in range(0, len(lst), n):
        #use yield instead of return to create a generator
        yield lst[i:i + n]

# Process files in chunks
chunk_size = 100
for i, chunk in enumerate(chunks(pathlist, chunk_size)):
    print(f"Processing chunk {i+1}...")
    df = pd.DataFrame(process_audio_files(chunk))
    if i == 0:
        # Save header only for the first chunk
        # We can use index = False because the index is not important in this case
        df.to_csv('data/extracted_features.csv', index=False)
    else:
        # Append data without header for subsequent chunks
        df.to_csv('data/extracted_features.csv', mode='a', index=False, header=False)
    print(f"Saved processed data for chunk {i+1}")

print("All chunks processed and saved.")

For some reason, some tracks yield errors and cannot be processed
I added a try except statement in the process_audio_files function in order to deal with that and not be interrupted everytime there is an error
This way no need to use the try except in the main loop and I can still use my "batch" extraction method
In the end we get 7994 tracks processed out of 8000 which is quite good and will not impede our ability to do further data analysis

Now that the features are extracted, we can actually work on the data analysis steps, namely : 
- dimensionality reduction and clustering in order to see if we can already distinguish clusters and infer "rules" to differentiate the different classes
- classification through either "classical" approaches like SVM or machine learning and neural networks oriented methods

Let's try to extract more features from the data in order to see how much better the classifiers perform
Instead of extracting only MFCC and spectral centroid, I will be extracting chroma vector, zero crossing rate, spectral contrast, rolloff and bandwidth

In [10]:
# Define the new extraction function
def extract_more_features(file_path):
    # Load audio file
    y, sr = librosa.load(file_path)
    
    # Extract MFCC features
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20)
    
    # Extract spectral centroid
    centroid = librosa.feature.spectral_centroid(y=y, sr=sr)

    # Extract chroma features
    chroma = librosa.feature.chroma_stft(y=y, sr=sr)
    
    # Extract spectral contrast
    contrast = librosa.feature.spectral_contrast(y=y, sr=sr)

    # Extact spectral roll-off
    rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
    
    # Extract spectral bandwidth
    bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)
    
    # Extract zero crossing rate
    zero_crossing = librosa.feature.zero_crossing_rate(y)
    
    return mfcc, centroid, chroma, zero_crossing, contrast, rolloff, bandwidth

# Define the new process_audio_files function
def process_audio_files_2(file_paths):
    data = []
    for file_path in file_paths:
        try:
            # Extract features
            mfcc, centroid, chroma, zero_crossing, contrast, rolloff, bandwidth = extract_more_features(file_path)
            
            # Calculate statistics for MFCC features
            mfcc_statistics = calculate_statistics(mfcc)
            # Calculate statistics for spectral centroid
            centroid_statistics = calculate_statistics(centroid)
            # Calculate statistics for chroma features
            chroma_statistics = calculate_statistics(chroma)
            # Calculate statistics for zero crossing rate
            zero_crossing_statistics = calculate_statistics(zero_crossing)
            # Calculate statistics for spectral contrast
            contrast_statistics = calculate_statistics(contrast)
            # Calculate statistics for spectral roll-off
            rolloff_statistics = calculate_statistics(rolloff)
            # Calculate statistics for spectral bandwidth
            bandwidth_statistics = calculate_statistics(bandwidth)
            
            # Store statistics and file id in a dictionary
            # Careful : use lstrip and not just strip because it would remove all the 0s of the files ending with 0 (like 10 would become 1)
            file_id = os.path.basename(file_path).split('.')[0].lstrip('0') # Extract track ID from file name
            entry = {'track_id': file_id}
            
            # Add MFCC statistics to entry
            for mfcc_number, mfcc_values in enumerate(mfcc):
                for stat_name, stat_value in mfcc_statistics.items():
                    entry[f'mfcc_{mfcc_number}_{stat_name}'] = stat_value[mfcc_number]           
            # Add centroid statistics to entry
            for stat_name, stat_value in centroid_statistics.items():
                entry[f'centroid_{stat_name}'] = stat_value[0]  # Spectral centroid has only one value per track
            # Add chroma statistics to entry
            for chroma_number, chroma_values in enumerate(chroma):
                for stat_name, stat_value in chroma_statistics.items():
                    entry[f'chroma_{chroma_number}_{stat_name}'] = stat_value[chroma_number]
            # Add zero crossing rate statistics to entry
            for stat_name, stat_value in zero_crossing_statistics.items():
                entry[f'zero_crossing_{stat_name}'] = stat_value[0]  # Zero crossing rate has only one value per track
            # Add spectral contrast statistics to entry
            for contrast_number, contrast_values in enumerate(contrast):
                for stat_name, stat_value in contrast_statistics.items():
                    entry[f'contrast_{contrast_number}_{stat_name}'] = stat_value[contrast_number]
            # Add spectral roll-off statistics to entry
            for rolloff_number, rolloff_values in enumerate(rolloff):
                for stat_name, stat_value in rolloff_statistics.items():
                    entry[f'rolloff_{rolloff_number}_{stat_name}'] = stat_value[rolloff_number]
            # Add spectral bandwidth statistics to entry
            for bandwidth_number, bandwidth_values in enumerate(bandwidth):
                for stat_name, stat_value in bandwidth_statistics.items():
                    entry[f'bandwidth_{bandwidth_number}_{stat_name}'] = stat_value[bandwidth_number]
            
            data.append(entry)

        except Exception as e:
            print(f"Error processing track {os.path.basename(file_path).split('.')[0].lstrip('0')}: {e}")
            continue
        
    return data

In [None]:
# Try the new function with the first two files to check for errors and execution time
file_paths = [pathlist[0], pathlist[1]]
df = pd.DataFrame(process_audio_files_2(file_paths))
print(df)

In [None]:
# Function to split list into chunks
def chunks(lst, n):
    for i in range(0, len(lst), n):
        #use yield instead of return to create a generator
        yield lst[i:i + n]

# Process files in chunks
chunk_size = 100
for i, chunk in enumerate(chunks(pathlist, chunk_size)):
    print(f"Processing chunk {i+1}...")
    df = pd.DataFrame(process_audio_files_2(chunk))
    if i == 0:
        # Save header only for the first chunk
        # We can use index = False because the index is not important in this case
        df.to_csv('data/more_extracted_features.csv', index=False)
    else:
        # Append data without header for subsequent chunks
        df.to_csv('data/more_extracted_features.csv', mode='a', index=False, header=False)
    print(f"Saved processed data for chunk {i+1}")

print("All chunks processed and saved.")