In [3]:
# import libraries
import pandas as pd
import tensorflow as tf
from IPython.display import Audio
import os
import matplotlib.pyplot as plt
import numpy as np
import librosa
import ast
#import tensorflow_io as tfio
import librosa.display
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow import keras

keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

## Data Wrangling

In [12]:
# path to the small directory
SMALL_AUDIO_DIR = 'data/fma_small/'

# function to get the paths to all the songs in the small dataset
def audio_paths(AUDIO_DIR):
    AUDIO_PATHS = []
    # iterate through all the directories with songs in them
    for path in [os.path.join('data/fma_small/', p) 
                 for p in os.listdir('data/fma_small/') 
                 if not (p.endswith('checksums') or p.endswith('.txt') or p.endswith('.DS_Store'))]:
        # add all songs to the list
        AUDIO_PATHS = AUDIO_PATHS + [os.path.join(path, track).replace('\\', '/') for track in os.listdir(path)]
    
    return AUDIO_PATHS

# store all the small paths
SMALL_PATHS = audio_paths(SMALL_AUDIO_DIR)

In [13]:
x, sr = librosa.load(SMALL_PATHS[0], sr=None, mono=True)
print('Duration: {:.2f}s, {} samples'.format(x.shape[-1] / sr, x.size))

start, end = 7, 17
Audio(data=x[start*sr:end*sr], rate=sr)

NoBackendError: 

In [14]:
# function to load metadata
# adapted from https://github.com/mdeff/fma/blob/master/utils.py
def metadata_load(filepath):

    filename = os.path.basename(filepath)

    if 'features' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'echonest' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'genres' in filename:
        return pd.read_csv(filepath, index_col=0)

    if 'tracks' in filename:
        tracks = pd.read_csv(filepath, index_col=0, header=[0, 1])

        COLUMNS = [('track', 'tags'), ('album', 'tags'), ('artist', 'tags'),
                   ('track', 'genres'), ('track', 'genres_all')]
        for column in COLUMNS:
            tracks[column] = tracks[column].map(ast.literal_eval)

        COLUMNS = [('track', 'date_created'), ('track', 'date_recorded'),
                   ('album', 'date_created'), ('album', 'date_released'),
                   ('artist', 'date_created'), ('artist', 'active_year_begin'),
                   ('artist', 'active_year_end')]
        for column in COLUMNS:
            tracks[column] = pd.to_datetime(tracks[column])

        SUBSETS = ('small', 'medium', 'large')
        try:
            tracks['set', 'subset'] = tracks['set', 'subset'].astype(
                    pd.CategoricalDtype(categories=SUBSETS, ordered=True))
        except ValueError:
            # the categories and ordered arguments were removed in pandas 0.25
            tracks['set', 'subset'] = tracks['set', 'subset'].astype(
                     pd.CategoricalDtype(categories=SUBSETS, ordered=True))

        COLUMNS = [('track', 'genre_top'), ('track', 'license'),
                   ('album', 'type'), ('album', 'information'),
                   ('artist', 'bio')]
        for column in COLUMNS:
            tracks[column] = tracks[column].astype('category')

        return tracks

In [15]:
# function to get genre information for each track ID
def track_genre_information(GENRE_PATH, TRACKS_PATH, FILE_PATHS, subset):
    """
    GENRE_PATH (str): path to the csv with the genre metadata
    TRACKS_PATH (str): path to the csv with the track metadata
    FILE_PATHS (list): list of paths to the mp3 files
    subset (str): the subset of the data desired
    """
    # get the genre information
    genres = pd.read_csv(GENRE_PATH)

    # load metadata on all the tracks
    tracks = metadata_load(TRACKS_PATH)

    # focus on the specific subset tracks
    subset_tracks = tracks[tracks['set', 'subset'] <= subset]

    # extract track ID and genre information for each track
    subset_tracks_genre = np.array([np.array(subset_tracks.index), 
                                  np.array(subset_tracks['track', 'genre_top'])]).T
    
    # extract track indices from the file paths
    track_indices = []
    for path in FILE_PATHS:
        track_indices.append(path.split('/')[-1].split('.')[0].lstrip('0'))

    # get the genre associated with each file path, thanks to the path ID
    track_indices = pd.DataFrame({'file_path':FILE_PATHS,'track_id':np.array(track_indices).astype(int)})
    tracks_genre_df = pd.DataFrame({'track_id': subset_tracks_genre[:,0], 'genre': subset_tracks_genre[:,1]})
    track_genre_data = track_indices.merge(tracks_genre_df, how='left')
    
    # label classes with numbers
    encoder = LabelEncoder()
    track_genre_data['genre_nb'] = encoder.fit_transform(track_genre_data.genre)
    
    return track_genre_data

# get genre information for all tracks from the small subset
GENRE_PATH = 'data/fma_metadata/genres.csv'
TRACKS_PATH = 'data/fma_metadata/tracks.csv'
subset = 'small'

small_tracks_genre = track_genre_information(GENRE_PATH, TRACKS_PATH, SMALL_PATHS, subset)

In [16]:
# visualize the first few rows to confirm each file path has an associated genre
print(small_tracks_genre.head())

                       file_path track_id    genre  genre_nb
0  data/fma_small/000/000002.mp3        2  Hip-Hop         3
1  data/fma_small/000/000005.mp3        5  Hip-Hop         3
2  data/fma_small/000/000010.mp3       10      Pop         6
3  data/fma_small/000/000140.mp3      140     Folk         2
4  data/fma_small/000/000141.mp3      141     Folk         2


In [8]:
# split these paths and associated genres into training and test sets
SMALL_AUDIO_TRAIN, SMALL_AUDIO_TEST = train_test_split(SMALL_PATHS, test_size=0.2, random_state=42)

In [10]:
# def get_label(file_path, genre_df=small_tracks_genre):
#     path = file_path.numpy()
#     path = path.decode("utf-8")
#     label = genre_df.loc[genre_df.file_path == file_path,'genre_nb'].values[0]
#     return tf.constant([label])

# for i in train_paths_dataset:
#     sample = i
    
# get_label(i)

In [11]:
# create the tf datasets for training and testing
# store the paths
train_paths_dataset = tf.data.Dataset.list_files(SMALL_AUDIO_TRAIN)
test_paths_dataset = tf.data.Dataset.list_files(SMALL_AUDIO_TEST)

# window size = number of observations we want to use
window_size = 100

# using librosa to load audio
def load_audio(file_path):
    path = file_path.numpy()
    path = path.decode("utf-8")
    
    return librosa.load(path, sr=None, mono=True)

# define a function to get the label associated with a file path
def get_label(file_path, genre_df=small_tracks_genre):
    path = file_path.numpy()
    path = path.decode("utf-8")
    label = genre_df.loc[genre_df.file_path == file_path,'genre_nb'].values[0]
    return tf.constant([label])

# define a function that extracts the desired features from a file path
def get_audio(file_path, window_size=window_size):
    audio = tf.py_function(load_audio, [file_path], tf.float32)
    audio = tf.expand_dims(audio,-1)
    filtered_audio = audio[:window_size,:]

    return filtered_audio

# process the path
def process_path(file_path, window_size=window_size):
    label = get_label(file_path)
    audio = get_audio(file_path, window_size)

    return audio, label

# parser, wrap around the processing function and specify output shape
def parser(file_path, window_size=window_size):
    audio, label = tf.py_function(process_path, [file_path], (tf.float32, tf.int32))
    audio.set_shape((window_size,1))
    label.set_shape((1,))

    return audio, label

# create the dataset
# Set `num_parallel_calls` so multiple images are loaded/processed in parallel.
train_data = train_paths_dataset.map(parser) 
                                    #  num_parallel_calls=tf.data.experimental.AUTOTUNE)

# batch and prefetch
train_data = train_data.batch(32)

# `prefetch` lets the dataset fetch batches in the background while the model
# is training.
train_data = train_data.prefetch(1)


# re-create for the test data
test_data = test_paths_dataset.map(parser)
                                    # num_parallel_calls=tf.data.experimental.AUTOTUNE)

# batch and prefetch
test_data = test_data.batch(32)
test_data = test_data.prefetch(1)

## Modeling

In [12]:
# create a simple model
# architecture and structure chosen somewhat randomly, tuning could happen later
model = tf.keras.Sequential([
                             tf.keras.layers.Conv1D(filters=128,
                                                    kernel_size=3,
                                                    activation='relu',
                                                    input_shape=[window_size,1],
                                                    name = 'conv1'),
                             
                             tf.keras.layers.MaxPooling1D(name='max1'),

                             tf.keras.layers.Conv1D(filters=64,
                                                    kernel_size=3,
                                                    activation='relu',
                                                    name='conv2'),
                             
                             tf.keras.layers.MaxPooling1D(name='max2'),

                             tf.keras.layers.Dropout(0.5, name='dropout'),

                             tf.keras.layers.Flatten(name='flatten'),
                             tf.keras.layers.Dense(512, activation='relu', name='dense1'),
                             tf.keras.layers.Dense(8, activation='softmax',name='dense2')                  
])

# compile
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
# train the model
epochs=20
history = model.fit(
    train_data,
    epochs=epochs)

Epoch 1/20
Epoch 2/20
12/60 [=====>........................] - ETA: 8:46 - loss: 2.0422 - accuracy: 0.1953

In [None]:
# evaluate on the test data
model.evaluate(test_data)