In [None]:
import sqlite3
import pandas as pd
import struct
import numpy as np
from collections import OrderedDict
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
import tensorflow as tf
import matplotlib.pyplot as plt
import os
import re
import keras

### Vectorización de audio features

In [69]:
def load_data():
    """ Lee features de la base de datos features.db"""

    features_db_path = "dataset/features.db"
    genres_db_path = "dataset/genres.db"
    metadata_db_path = "dataset/metadata.db"
    tags_db_path = "dataset/tags.db"

    conn = sqlite3.connect(features_db_path)
    features_df = pd.read_sql_query("SELECT * FROM features", conn)
    conn.close()

    conn = sqlite3.connect(genres_db_path)
    genres_df = pd.read_sql_query("SELECT * FROM genres", conn)
    conn.close()

    conn = sqlite3.connect(metadata_db_path)
    metadata_df = pd.read_sql_query("SELECT * FROM metadata", conn)
    conn.close()

    conn = sqlite3.connect(tags_db_path)
    tags_df = pd.read_sql_query("SELECT * FROM tags", conn)
    conn.close()

    if not os.path.exists("models"):
        os.makedirs("models")

    return features_df, genres_df, metadata_df, tags_df

features_df, genres_df, metadata_df, tags_df = load_data()

In [70]:
def decode_tempo(b):
    try:
        if isinstance(b, float):
            return b
        if isinstance(b, (bytes, bytearray)) and len(b) == 4:
            return struct.unpack('f', b)[0]  # <-- 'f' = float32 (4 bytes)
        if isinstance(b, str) and ',' in b:
            byte_list = list(map(int, b.split(',')))
            b = bytes(byte_list)
            return struct.unpack('f', b)[0]
    except Exception as e:
        print(f"⚠️ decode_tempo falló con {repr(b)}: {e}")
    return np.nan

def decode_array(b):
    try: 
        return np.frombuffer(b, dtype=np.float32)
    except Exception:
        return b

def concat_full_vector():
    """Concatena en un solo vector datos de features, genres y tags"""
    df = pd.concat([features_df, genres_df, tags_df], axis=1)
    df = df.loc[:, ~df.columns.duplicated()]
    df.drop(columns=['clip_id', 'mp3_path'], inplace=True)
    df.dropna(inplace=True)
    df['tempo'] = df['tempo'].apply(decode_tempo)
    df['mfcc_mean'] = df['mfcc_mean'].apply(decode_array)
    df['chroma_mean'] = df['chroma_mean'].apply(decode_array)
    df['stft_mean'] = df['stft_mean'].apply(decode_array)
    vectors = []
    for _, row in df.iterrows():
        row_copy = row.drop(labels=['mfcc_mean', 'tempo', 'stft_mean', 'chroma_mean'], errors = "ignore").copy()
        other_features = row_copy.values.astype(np.float32)
        mfcc = row['mfcc_mean']
        tempo = np.array([row['tempo']], dtype=np.float32)
        chroma = row['chroma_mean']
        stft = row['stft_mean']
        full_vector = np.concatenate([stft, mfcc, chroma, tempo, other_features])
        vectors.append(full_vector)
    return np.stack(vectors)

def concat_vector():
    """"Concatena en un solo vector datos de features solo"""
    df = features_df.copy()
    df.drop(columns=['clip_id', 'mp3_path'], inplace=True)
    df.dropna(inplace=True) 
    df['tempo'] = df['tempo'].apply(decode_tempo)
    df['mfcc_mean'] = df['mfcc_mean'].apply(decode_array)
    df['chroma_mean'] = df['chroma_mean'].apply(decode_array)
    df['stft_mean'] = df['stft_mean'].apply(decode_array)
    vectors = []
    for _, row in df.iterrows():
        mfcc = row['mfcc_mean']
        tempo = np.array([row['tempo']], dtype=np.float32)
        chroma = row['chroma_mean']
        stft = row['stft_mean']
        full_vector = np.concatenate([stft, mfcc, chroma, tempo])
        vectors.append(full_vector)
    return np.stack(vectors)

vectors = concat_vector()
full_vectors = concat_full_vector()
print(vectors.shape)
print(vectors[0].shape)
print(full_vectors.shape)
print(full_vectors[0].shape)

(17783, 1051)
(1051,)
(17783, 1146)
(1146,)


### Normalización

In [71]:
scaler = StandardScaler()
X = scaler.fit_transform(vectors)
X_full = scaler.fit_transform(full_vectors)

In [72]:
X_train, X_test = train_test_split(X, test_size=0.2, random_state=42)
X_full_train, X_full_test = train_test_split(X_full, test_size=0.2, random_state=42)

### Importación de autoencoders

In [73]:
@keras.saving.register_keras_serializable()
class Sampling(tf.keras.layers.Layer):
    """Clase de muestreo para el VAE"""
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [74]:
def load_models():
    """Carga los modelos de autoencodes"""
    custom_objects = {'Sampling': Sampling}
    encoder = tf.keras.models.load_model("models/encoder.keras")
    encoder_full = tf.keras.models.load_model("models/encoder_full.keras")
    vae_encoder = tf.keras.models.load_model("models/vae_encoder.keras", custom_objects=custom_objects)
    vae_full_encoder = tf.keras.models.load_model("models/vae_full_encoder.keras", custom_objects=custom_objects)

    return encoder, encoder_full, vae_encoder, vae_full_encoder

In [75]:
def load_encoded_data(vectors, full_vectors):
    """Carga los datos codificados"""
    encoder, encoder_full, vae_encoder, vae_full_encoder = load_models()
    encoded_vectors = encoder.predict(vectors)
    encoded_vectors_full = encoder_full.predict(full_vectors)
    vae_vectors_data = vae_encoder.predict(vectors)
    vae_vectors_data_full = vae_full_encoder.predict(full_vectors)

    return encoded_vectors, encoded_vectors_full, vae_vectors_data, vae_vectors_data_full

encoded_vectors, encoded_vectors_full, vae_vectors_data, vae_vectors_data_full = load_encoded_data(X, X_full)

[1m556/556[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 867us/step
[1m556/556[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 881us/step
[1m556/556[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 891us/step
[1m556/556[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 968us/step


### Similitud entre vectores

In [76]:
def get_song_by_index(index):
    """Devuelve el clip_id y el mp3_path de la canción en la posición index"""
    conn = sqlite3.connect("dataset/features.db")
    features_df = pd.read_sql_query("SELECT * FROM features", conn)
    conn.close()
    return features_df.iloc[index][['clip_id', 'mp3_path']].to_dict()

#### K-Nearest Neighbors

In [77]:
def _extract_song_id(path):
	"""Extrae el identificador de la canción sin los tiempos del fragmento."""
	return re.sub(r"-\d+-\d+\.mp3$", "", path)

In [78]:
def recommend_songs_by_fragment_knn(fragment_index, embeddings, fragment_to_song, n_songs=5, search_pool=50, n_jobs=6):
    """
    Devuelve hasta `n_songs` diferentes basados en fragmentos similares.
    - fragment_index: índice del fragmento base en `embeddings`.
    - embeddings: matriz de embeddings de fragmentos.
    - fragment_to_song: lista que mapea fragmento → canción.
    - n_songs: cantidad de canciones únicas deseadas.
    - search_pool: cuántos vecinos buscar inicialmente (se expandirá si no se alcanzan n_songs).
    """

    knn = NearestNeighbors(n_neighbors=search_pool, algorithm='ball_tree', n_jobs=n_jobs, metric='minkowski')
    knn.fit(embeddings)
    distances, indices = knn.kneighbors(embeddings[fragment_index].reshape(1, -1), n_neighbors=search_pool)

    base_song_id = fragment_to_song[fragment_index]

    seen_songs = OrderedDict()
    for dist, idx in zip(distances[0], indices[0]):
        song_id = fragment_to_song[idx]
        if song_id == base_song_id:
            continue
        if song_id not in seen_songs:
            seen_songs[song_id] = dist
        if len(seen_songs) >= n_songs:
            break

    return list(seen_songs.items())


In [79]:
metadata_df['song_id'] = metadata_df['mp3_path'].apply(_extract_song_id)
index = 0
song = metadata_df.iloc[index]
print(f"Recomendaciones para el fragmento {index} -> {song['title']} | {song['artist']}:")

for song_id, dist in recommend_songs_by_fragment_knn(0, encoded_vectors, metadata_df['song_id'].values):
    row = metadata_df[metadata_df['song_id'] == song_id].iloc[0]
    song_id = row['title']
    artist = row['artist']
    print(f"  | {song_id} | {artist} (distancia: {dist:.4f})")

Recomendaciones para el fragmento 0 -> BWV54 - I Aria | American Bach Soloists:
  | -BWV54 - III Aria- | American Bach Soloists (distancia: 1.8507)
  | Aria_ Diedi il core ad altra Ninfa | Philharmonia Baroque Orchestra (distancia: 2.1467)
  | Duet - Wenn kommst du mein Heil | American Bach Soloists (distancia: 2.2835)
  | Entree des Pelerins _ Air de furie _ Sarabande (Rameau_ Les Paladins) | Philharmonia Baroque (distancia: 2.2889)
  | Duet - Mein Freund ist mein | American Bach Soloists (distancia: 2.4094)
