In [1]:
import pandas as pd
import numpy as np
from urllib.parse import unquote
import os
from contextlib import suppress
import warnings
warnings.filterwarnings("ignore")
from sklearn.preprocessing import StandardScaler
import pickle  
from sklearn.metrics.pairwise import cosine_similarity

with open('dist/dict_columns_groups.pkl', 'rb') as pkl:
        dict_columns_groups = pickle.load(pkl)
        
with open('dist/lgbm_clf.pkl', 'rb') as pkl:
        LightGBM = pickle.load(pkl)

In [2]:
# Nuestra librería de funciones
from commons import *

In [100]:
data_midi = pd.read_csv('data_midi_drop.csv',sep=",")
data_midi.shape

(88359, 509)

In [101]:
colums_categorized = []
for keys in dict_columns_groups:
    for values in dict_columns_groups[keys]:
        colums_categorized.append(values)
        
colums_categorized

columns_not_categorized = []
for col in data_midi.columns:
    if col not in colums_categorized:
        columns_not_categorized.append(col)

columns_not_categorized.remove('tema')
columns_not_categorized.remove('path')
columns_not_categorized.remove('indice')
columns_not_categorized.remove('Cluster')

In [102]:
def cosine_similarity_row(X__sc, vec_a,indice):
    vec_b = X__sc
    # TODO: vectorizar el cosine_similarity
    cosine_list =cosine_similarity([vec_a], vec_b).reshape(-1)
    similarity = pd.Series(cosine_list,index=indice).sort_values(ascending=False)
    return similarity

In [103]:
data_midi.set_index(data_midi.indice,inplace=True,drop=False)
data_index = data_midi[['indice', 'Cluster']]

data_midi.fillna(0, inplace=True)
X=data_midi.drop(['tema','indice','path', 'Cluster'],axis=1)
data_midi.index

sc = StandardScaler()
sc.fit(X)
X__sc = sc.transform(X)
del data_midi

df_scaled = pd.DataFrame(X__sc,columns=X.columns)
df_scaled.set_index(data_index['indice'],inplace=True)
df_scaled['Cluster'] = data_index['Cluster']
data_index['indice_lower_search'] = data_index['indice'].apply(lambda x: x.lower().replace('-',' ').replace('_',' '))
data_index['indice_lower'] = data_index['indice'].apply(lambda x: x.lower())
del data_index['Cluster']

In [8]:
# Iniciamos nuestra API
import requests
from flask import  Flask, request, jsonify, render_template, send_file, safe_join, abort
app = Flask('recomendar canciones')

In [None]:
## B
@app.route("/search_titles",methods=['GET'])
def search_titles(data_index=data_index, sc=sc):   
    search=request.args['search']
    page_number =int(request.args['page_number'])
    cant_resultados = int(request.args['cant_resultados'])

    page_from = page_number*cant_resultados
    page_to = page_from + cant_resultados
    print(page_from,page_to)
    result = data_index[data_index['indice_lower_search'].str.contains(search.lower())].indice[page_from:page_to].to_dict()
    response = jsonify(result)
    # Enable Access-Control-Allow-Origin
    response.headers.add("Access-Control-Allow-Origin", "*")
    return response

In [None]:
## Devuelve valores la diferencia de cada parametros entre dos temas. Permite interpretar los resultados generados por la funcion get_related_songs()
@app.route("/similarity_songs",methods=['GET'])
def get_simil_songs():   

    data=df_scaled.copy()
    
    duracion_notas =float(request.args['duracion_notas'])
    amplitud_tonal = float(request.args['amplitud_tonal'])
    ritmica_instrument = float(request.args['ritmica_instrument']) 
    ritmica_drums = float(request.args['ritmica_drums'])
    armonia = float(request.args['armonia'])
    dinamica = float(request.args['dinamica'])
    instrumentacion = float(request.args['instrumentacion'])
    tempo = float(request.args['tempo'])
    notas_simultaneas = float(request.args['notas_simultaneas']) 
    duracion_tema = float(request.args['duracion_tema']) 
    others = float(request.args['others'])
    penalizacion = float(request.args['penalizacion'])
    filtrar=request.args['filtrar']
    
    song1 = request.args['song1']
    song2 = request.args['song2']
    
    mask_category =  np.full((data.shape[0],1), True, dtype=bool)
        
    dict_key_values = {'instrumentacion':instrumentacion,
                     'ritmica_drums':ritmica_drums ,
                     'ritmica_instrument':ritmica_instrument ,
                     'amplitud_tonal':amplitud_tonal ,
                     'dinamica':dinamica, 
                     'duracion_notas1':duracion_notas, 
                     'duracion_notas2':duracion_notas, 
                     'notas_simultaneas':notas_simultaneas ,
                     'tempo':tempo, 
                     'duracion_tema':duracion_tema ,
                     'armonia1':armonia, 
                     'armonia2':armonia, 
                     'armonia3':armonia, 
                     'armonia4':armonia, 
                     'armonia5':armonia}
    
    # La penalizacion genera un orden de magnitud para la distancia minima a la hora de calcular el cosine_similarity
    # Si la penalización es 1, el min_sim_distance sera de 0.1, si es 2 sera de 0.01
    # Como resultante las columnas penalizades tendrán aún menos relevancia a la hora de rastreas similitudes entre temas 
    min_sim_distance = 1 / 10**penalizacion 
    
    for keys in dict_key_values.keys():
        cant_keys = len(dict_columns_groups[keys])
        for column in dict_columns_groups[keys]:
            data[column] = (data[column] / np.sqrt(cant_keys)) * (dict_key_values[keys] + min_sim_distance)

    for column in columns_not_categorized:
        data[column] = (data[column] / np.sqrt(cant_keys) ) * (others + min_sim_distance)
        
    data = data.loc[[song1, song2]]

    if (filtrar == 'True'):
        df_values = data.sum(axis=0)
        df_values_mask = df_values > 0
        data = data.loc[:,df_values_mask]
        print('columnas sin filtrar:',df_values_mask.count())
              
    df_diff = data.diff().abs().iloc[1].T.sort_values(ascending=False)
    
    return df_diff.to_json()

In [None]:
## Devuelve un diccionario con los {{cant_resultados}} temas con mayor similitud basada en los parametros recibidos, permite paginado
@app.route("/related_songs",methods=['GET'])
def get_related_songs(df_scaled=df_scaled, sc=sc):   
    data=df_scaled.copy()
    search=request.args['search']
    predict=request.args['predict']
    filtrar=request.args['filtrar']
    duracion_notas =float(request.args['duracion_notas'])
    amplitud_tonal = float(request.args['amplitud_tonal'])
    ritmica_instrument = float(request.args['ritmica_instrument']) 
    ritmica_drums = float(request.args['ritmica_drums'])
    armonia = float(request.args['armonia'])
    dinamica = float(request.args['dinamica'])
    instrumentacion = float(request.args['instrumentacion'])
    tempo = float(request.args['tempo'])
    notas_simultaneas = float(request.args['notas_simultaneas']) 
    duracion_tema = float(request.args['duracion_tema']) 
    cant_resultados = int(request.args['cant_resultados'])
    others = float(request.args['others'])
    penalizacion = float(request.args['penalizacion'])
    page_number =int(request.args['page_number'])
    clusterizar=request.args['clusterizar']
    
    print(search)
    search_song = data_index[data_index['indice_lower'] == search.lower()].indice.iloc[0]
    new_midi = ""
    cluster = ""
    if (search.lower() == 'false'):
        new_midi = get_midi_from_path()   
        tema = new_midi.tema 
        midi_df_cols = pd.DataFrame(columns=data.columns).drop(['Cluster'],axis=1)
        mask_columns = list(set(df_scaled.columns) & set(new_midi.columns))
        new_midi = new_midi[mask_columns]
        midi_df = pd.concat([midi_df_cols, new_midi])
        midi_df.fillna(0, inplace=True)
        new_midi = sc.transform(midi_df) 
        cluster = LightGBM.predict(new_midi)[0]
        print('predicted cluster:',cluster) 
        midi_df = pd.DataFrame(new_midi, columns=midi_df_cols.columns)
        midi_df['Cluster'] = cluster
        data = pd.concat([data, midi_df]) 
        path = '..\GET_FILE\\'         
        data.iloc[data.shape[0] - 1:data.shape[0],:].index = path + tema
        search_song = data.iloc[data.shape[0] - 1:data.shape[0],:].index    
        print('tema:',search_song)
    else:
        cluster = data[data.index == search_song].Cluster.iloc[0]     
        print('cluster:',cluster)   
        
    mask_cluster = data['Cluster'] == cluster
    if (clusterizar.lower() == 'true'):
        data = data[mask_cluster]
        print(cluster, data.shape)
    
    dict_key_values = {'instrumentacion':instrumentacion,
                     'ritmica_drums':ritmica_drums ,
                     'ritmica_instrument':ritmica_instrument ,
                     'amplitud_tonal':amplitud_tonal ,
                     'dinamica':dinamica, 
                     'duracion_notas1':duracion_notas, 
                     'duracion_notas2':duracion_notas, 
                     'notas_simultaneas':notas_simultaneas ,
                     'tempo':tempo, 
                     'duracion_tema':duracion_tema ,
                     'armonia1':armonia, 
                     'armonia2':armonia, 
                     'armonia3':armonia, 
                     'armonia4':armonia, 
                     'armonia5':armonia}

    min_sim_distance = 1 / 10**penalizacion 
    
    for keys in dict_key_values.keys():
        cant_keys = len(dict_columns_groups[keys])
        for column in dict_columns_groups[keys]:
            data[column] = (data[column] / np.sqrt(cant_keys)) * (dict_key_values[keys] + min_sim_distance)

    for column in columns_not_categorized:
        data[column] = (data[column] / np.sqrt(cant_keys) ) * (others + min_sim_distance)
     
    song = data.loc[search_song,:]
    if (isinstance(song, pd.Series) == False):
        song = song.iloc[0]
    
    if (filtrar.lower() == 'true'):
        song_values_mask = (song > 0).index
        data = data.loc[:,song_values_mask]
    print(type(data.loc[search_song,:]))
    
    similarity = cosine_similarity_row(data.to_numpy(), song.array, data.index)
    page_from = page_number*cant_resultados
    page_to = page_from + cant_resultados
    result = similarity.iloc[page_from:page_to:].reset_index()
    result.columns = ['path', 'value']
    result = result.to_dict()
    response = jsonify(result)
    # Enable Access-Control-Allow-Origin
    response.headers.add("Access-Control-Allow-Origin", "*")
    
    return response

In [None]:
## Permite solicitar un archivo MIDI a partir de su ruta relativa
app.config["FILES_PATH"] = "Full_MIDI"
@app.route("/get-file/<path:filename>")
def get_file(filename):
    safe_path = safe_join(app.config["FILES_PATH"], filename)
    print(safe_path)
    try:
        response = send_file(safe_path, as_attachment=True)
        response.headers.add("Access-Control-Allow-Origin", "*")
        return response
    except FileNotFoundError:
        abort(404)

In [None]:
app.run(host='0.0.0.0')