In [1]:
import os

import IPython.display as ipd
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn as skl
import sklearn.utils, sklearn.preprocessing, sklearn.decomposition, sklearn.svm
import librosa
import librosa.display
import ast
import pickle
from scipy.spatial import distance

In [2]:
import hyperparameters as hps
from embedding import embedding as embedding

In [3]:
def get_audio_path(audio_dir, track_id):
    """
    Return the path to the mp3 given the directory where the audio is stored
    and the track ID.
    Examples
    --------
    >>> import utils
    >>> AUDIO_DIR = os.environ.get('AUDIO_DIR')
    >>> utils.get_audio_path(AUDIO_DIR, 2)
    '../data/fma_small/000/000002.mp3'
    """
    tid_str = '{:06d}'.format(track_id)
    path = os.path.join(audio_dir, tid_str[:3], tid_str + '.mp3')
    return path

In [4]:
def load(filepath):

    filename = os.path.basename(filepath)

    if 'features' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'echonest' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'genres' in filename:
        return pd.read_csv(filepath, index_col=0)

    if 'tracks' in filename:
        tracks = pd.read_csv(filepath, index_col=0, header=[0, 1])

        COLUMNS = [('track', 'tags'), ('album', 'tags'), ('artist', 'tags'),
                   ('track', 'genres'), ('track', 'genres_all')]
        for column in COLUMNS:
                tracks[column] = tracks[column].map(ast.literal_eval)

        COLUMNS = [('track', 'date_created'), ('track', 'date_recorded'),
                   ('album', 'date_created'), ('album', 'date_released'),
                   ('artist', 'date_created'), ('artist', 'active_year_begin'),
                   ('artist', 'active_year_end')]
        for column in COLUMNS:
            tracks[column] = pd.to_datetime(tracks[column])

        SUBSETS = ('small', 'medium', 'large')
        tracks['set', 'subset'] = tracks['set', 'subset'].astype(pd.api.types.CategoricalDtype(categories=SUBSETS, ordered=True))
        
        

#         COLUMNS = [('track', 'license'), ('artist', 'bio'),
#                    ('album', 'type'), ('album', 'information')]
        COLUMNS = [('track', 'genre_top'), ('track', 'license'),
                   ('album', 'type'), ('album', 'information'),
                   ('artist', 'bio')]
        for column in COLUMNS:
            tracks[column] = tracks[column].astype(pd.api.types.CategoricalDtype())

        return tracks


In [5]:
def save_pickle(file_name, file_data):
    with open(file_name, "wb") as fp:
        pickle.dump(file_data, fp)

In [6]:
def load_pickle(file_name):
    with open(file_name, "rb") as fp:
        return pickle.load(fp)

In [7]:
def mfcc_extraction_func(x, sr):
    stft = np.abs(librosa.stft(x, n_fft=2048, hop_length=512))
    mel = librosa.feature.melspectrogram(sr=sr, S=stft**2)
    mfcc = librosa.feature.mfcc(S=librosa.power_to_db(mel), n_mfcc=20)
    return mfcc

In [8]:
def chroma_extraction_func(x, sr):
    cqt = np.abs(librosa.cqt(x, sr=sr, hop_length=512, bins_per_octave=12, n_bins=7*12, tuning=None))
    f = librosa.feature.chroma_cens(C=cqt, n_chroma=12, n_octaves=7)
    return f

In [9]:
# Load metadata and features.
BAD_TRACK_ID = [98565, 98567, 98569, 99134, 108925, 133297]

tracks = load('../fma_metadata/tracks.csv')
tracks = tracks.drop(labels = BAD_TRACK_ID)

genres = load('../fma_metadata/genres.csv')
features = load('../fma_metadata/features.csv')
features = features.drop(labels = BAD_TRACK_ID)

np.testing.assert_array_equal(features.index, tracks.index)

In [10]:
AUDIO_DIR = "fma_small"
extracted_features = []
small = tracks['set', 'subset'] <= 'small'

# Evaluation

In [11]:
def ranking(query, documents, document_ids, dist_func='cos', num_results=10):
    distances = np.zeros(documents.shape[0])
    for i in range(documents.shape[0]):
        if dist_func == 'cos':
            distances[i] = distance.cosine(query, documents[i])
        elif dist_func == 'norm':
            distances[i] = np.linalg.norm(documents[i] - query)
    ranking_result_index = np.argsort(distances)
    return document_ids[ranking_result_index[:num_results]]

In [12]:
def add_random_noise_to_wave(x, noise_level):
    x += noise_level * np.random.rand(len(x))
    return x

In [13]:
def get_noised_audio_features(filename, feature_extraction_func, noise_level):
    x, sr = librosa.load("../"+filename, sr=None, mono=True)
    x = x[:(10*sr)]
    # adding distortion to loaded wave
    x = add_random_noise_to_wave(x, noise_level)
    extracted_features = feature_extraction_func(x, sr)
    return extracted_features

In [14]:
def create_test_data(all_track_ids, feature_extraction_func, noise_level=0.01, num_data=100):
    test_track_ids = np.random.choice(all_track_ids, num_data, replace=False)
    test_data = []
    for track_id in test_track_ids:
        filename = get_audio_path(AUDIO_DIR, track_id)
        test_feature = get_noised_audio_features(filename, feature_extraction_func, noise_level)
        print("Creating test data:", track_id)
        test_data.append(test_feature)
    return test_track_ids, test_data

In [15]:
def evaluate(all_track_ids, all_embedding, test_track_ids, test_embedding, dist_func='cos'):
    correct_ranking = 0
    ranking_result_list = []
    for i, test_track_id in enumerate(test_track_ids):
        query_track_id = test_track_ids[i]
        query = test_embedding[i]
        
        ranking_result = ranking(query, all_embedding, all_track_ids, dist_func)
        ranking_result_list.append(ranking_result)
        
        if ranking_result[0] == query_track_id:
            correct_ranking += 1
    accuracy = correct_ranking / len(test_track_ids)
    print ('Accuracy is {}.'.format(accuracy))
    return test_track_ids, ranking_result_list

In [17]:
feature_name = "mfcc"
model_file_name = "{}/sec2sec_{}_{}.pkl".format(hps.model_dir, feature_name, 5900)
all_embedding_file_name = "{}_embedding_vector.pkl".format(feature_name)
test_data_file_name = "test_data_{}.pkl".format(feature_name)

# Load all embedding vectors
all_embedding = load_pickle(all_embedding_file_name)

In [25]:
# Create test data by add random noise on randomly selected tracks
all_track_ids = tracks.loc[small].index
feature_extraction_func = chroma_extraction_func if feature_name == "chroma" else mfcc_extraction_func
test_track_ids, test_data_feature = create_test_data(all_track_ids, feature_extraction_func, noise_level=0.1, num_data=100)
save_pickle(test_data_file_name, test_data_feature)

Creating test data: 69198
Creating test data: 104062
Creating test data: 98582
Creating test data: 32686
Creating test data: 29602
Creating test data: 110634
Creating test data: 62529
Creating test data: 27177
Creating test data: 54665
Creating test data: 111392
Creating test data: 12051
Creating test data: 113699
Creating test data: 128845
Creating test data: 11775
Creating test data: 78516
Creating test data: 90616
Creating test data: 55122
Creating test data: 59449
Creating test data: 21401
Creating test data: 111372
Creating test data: 40984
Creating test data: 124394
Creating test data: 107125
Creating test data: 24427
Creating test data: 86263
Creating test data: 55811
Creating test data: 42659
Creating test data: 81037
Creating test data: 68851
Creating test data: 98251
Creating test data: 148537
Creating test data: 69195
Creating test data: 60038
Creating test data: 80680
Creating test data: 86634
Creating test data: 111150
Creating test data: 130933
Creating test data: 121366


In [26]:
# Create test data embedding vectors
test_embedding = embedding(model_file_name, test_data_file_name)

test_track_ids, ranking_result_list = evaluate(all_track_ids, all_embedding, test_track_ids, test_embedding, dist_func='cos')
test_track_ids, ranking_result_list = evaluate(all_track_ids, all_embedding, test_track_ids, test_embedding, dist_func='norm')
save_pickle("seq2seq_test_track_ids.pkl", test_track_ids)
save_pickle("seq2seq_ranking_result_list.pkl", ranking_result_list)

  output, (hn, cn) = self.lstm(inputs, hidden_cell)
  output, (hn, cn) = self.lstm(inputs, hidden_cell)


Embedded 50 audios.
Embedded 100 audios.
Accuracy is 0.02.
Accuracy is 0.02.
