Project name: DJ-Running

Authors: Jorge García de Quirós, Sandra Baldassarri, Pedro Álvarez

Affiliation/Institution: Computer Science and Systems Engineering Department, University of Zaragoza (Spain)

Paper: RIADA: a machine-learning based infrastructure for recognising the emotions of the Spotify songs

Date: October, 2020

In [3]:
#feature_selection_aprox: array of sckitlearn feature selectors
# X: Features of the dataset
# y: Labels of the dataset
# features: names of the features for each X column 
# returns sorted array in increment order of the importance of each feature
from sklearn.feature_selection import SelectKBest

def findWorstFeatures (feature_selection_aprox, X, y, features):
    #vector where the total appearances will be saved
    ranker = [0]*len(X[0])
    for aprox in feature_selection_aprox:
        for q_feat in range (1,len(X[0])):
            KB = SelectKBest(aprox, k=q_feat)
            KB.fit(X, y)
            ranker=ranker+KB.get_support() #Vector donde 1 si es seleccionado y 0 si no
    return [a for _,a in sorted(zip(ranker,features))]

In [4]:
class Model:
    def __init__(self, features, model, score):
        self.features = features
        self.model = model
        self.score = score
        
    def __str__(self):
        return "Features: " + str(self.features) +  "\nModel: " + str(self.model)  + "\nScore: " + str(self._score)

In [1]:
#data_frame: pandas data frame
#models: [[model, hyper], [model2, hyper2]]
#feature_selection: [fs1, fs2, ...]
#min_var: minimum quantity of variable
#max_var: maximum quantity of variable
#treshold: treshold considered to binarise the label
#target: name of the column target of the model
#niter: num of iterarions in randomized search for hyperparams

import pandas as pd

from sklearn.model_selection import RandomizedSearchCV

def findBestModel(data_frame, models, feature_selection_methods, min_var, max_var, target, niter):
    #Results array for each model
    bestResults = []
    for model in models:
        bestResults.append(Model(None,None,0))
    
    #Initialize arrays
    X = (data_frame.loc[:, data_frame.columns[0:len(data_frame.columns)-1]]).values
    Y = data_frame.loc[:, target:target].dropna()
    
    #Feature names array ordered (min-max) in features inmportance
    worst_features = findWorstFeatures(feature_selection_methods, X, Y, data_frame.columns[0:len(data_frame.columns)-1])
    
    i=0
    for model in models:
        for drop_vars in range(len(X[0])-max_var,len(X[0])-min_var,):
            #drop_vars worst features dropped
            
            df_aux = data_frame.loc[:, data_frame.columns[0:len(data_frame.columns)-1]]
            
            new_X = df_aux.drop(worst_features[0:drop_vars], axis=1)

            X_train = new_X 
            y_train = Y

            random_search = RandomizedSearchCV(model[0], param_distributions=model[1], n_iter=niter, cv=3, random_state=0, scoring="f1_macro")
            random_search.fit(X_train, y_train)
            score = random_search.best_score_

            if ((bestResults[i].model is None) or  score > bestResults[i].score):
                    bestResults[i] = Model(worst_features[drop_vars: len(data_frame.columns)-1], random_search.best_estimator_ , score) 
        i = i+1
    return bestResults

In [8]:
#Metrics
from sklearn.metrics import f1_score, precision_score, recall_score,accuracy_score

def printResults(results, X_train, X_test, y_train, y_test):
    for result in results:
        print "--------------------------------------------------------------------"
        print str(result.model)
        print "Features:" + str(result.features)
        result.model.fit(X_train.loc[:, result.features], y_train)
        y_pred = result.model.predict(X_test.loc[:, result.features])
        print ("F1_test: %0.4f") % (f1_score(y_test, y_pred, average="macro"))
        print ("F1_train: %0.4f") % (result.score)
        print ("Precision: %0.4f") % (precision_score(y_test, y_pred, average="macro"))
        print ("Recall: %0.4f") % recall_score(y_test, y_pred, average="macro")
        print ("Accuracy: %0.4f") % accuracy_score(y_test, y_pred)

In [4]:
from joblib import dump

def saveModels (results, folder, name):
    for result in results:
        dump(result.model, folder+'/model_'+name+'_'+result.model.__class__.__name__+'.joblib')
        dump(result.features, folder+'/features_'+name+'_'+result.model.__class__.__name__+'.joblib')

In [7]:
from joblib import load

def loadModels (folder, names):
    models = {}
    for name in names:
        models[name] = [load(folder+"model_"+name+".joblib"), load(folder+"features_"+name+".joblib")]
    return models

In [6]:
def printComparation(model, features, X, y):
        y_pred = model.predict(X.loc[:, features])
        print ("F1_test: %0.4f") % (f1_score(y, y_pred, average="macro"))
        print ("Precision: %0.4f") % (precision_score(y, y_pred, average="macro"))
        print ("Recall: %0.4f") % recall_score(y, y_pred, average="macro")
        print ("Accuracy: %0.4f") % accuracy_score(y, y_pred)

In [9]:
#calculateLabel(["SA_GE_happy_RandomForestClassifier","SA_GE_sad_RandomForestClassifier"],
#"/home/jorge/djrunning/experimentos_py/best_mood_classifier/1v3/",'/home/jorge/djrunning/datasets/new_dataset/mf_big_2.csv',
#'/home/jorge/djrunning/datasets/new_dataset/mf_big_2.csv')

from sklearn.svm import LinearSVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn import preprocessing

import pandas as pd
import numpy as np
    
def calculateLabel(model_names, url, url_dataset, url_normalization):
    #Set normalizer
    o_model = pd.read_csv(url_normalization)
    min_max_scaler = preprocessing.MinMaxScaler()
    aux_X = min_max_scaler.fit(o_model.loc[:,'valence':'mode'])
    
    #Import dataset
    dataset = pd.read_csv(url_dataset)
    aux_X = min_max_scaler.transform(dataset.loc[:,'valence':'mode'])
    dataset_norm=pd.DataFrame(aux_X)
    dataset_norm.columns=[u'valence', u'energy', u'liveness', u'tempo', u'speechiness',
       u'acousticness', u'instrumentalness', u'danceability', u'key',
       u'duration_ms', u'loudness', u'mode']

    models = loadModels (url, model_names)
    y_pred = []
    new_names = []
    for model_name in models:
        model,features = models[model_name]
        y_pred.append(model.predict_proba(dataset_norm.loc[:, features])[:,1])
        new_names.append(model_name)
        

    y_pred = np.matrix(y_pred)
    resul = pd.DataFrame(np.transpose(y_pred))
    resul.columns = new_names
    
    return resul

In [3]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

def search_feat_df(df,features):
    new_df = pd.DataFrame()
    songs_50 = []
    for index, row in df.iterrows():
        songs_50.append(row['id'])
        if len(songs_50) == 50:
            tracks_features = SP.audio_features(songs_50)
            tracks_features  = list(filter(None, tracks_features))
            aux = pd.DataFrame(tracks_features)
            new_df = new_df.append(aux[features])
            songs_50 = []

    if (len(songs_50)>0):
        tracks_features = SP.audio_features(songs_50)
        tracks_features  = list(filter(None, tracks_features))
        aux = pd.DataFrame(tracks_features)
        new_df = new_df.append(aux[features])
        songs_50 = []
    return new_df