## Tesis

In [None]:
import re
import nltk
import spacy
from bs4 import BeautifulSoup
from nltk.corpus import stopwords
import pandas as pd
from sentence_transformers import SentenceTransformer
from transformers import pipeline
import numpy as np
from tqdm import tqdm
import torch
import faiss
from sklearn.preprocessing import normalize
import math
import ast

### 1) Tratamiento de las letras

In [None]:
df_en = pd.read_csv('canciones_ingles.csv')

In [None]:
# Descargar stopwords de NLTK (solo necesario la primera vez)
nltk.download('stopwords')

# Cargar modelo de spaCy para inglés
nlp_en = spacy.load("en_core_web_sm")

In [None]:
def limpiar_texto(texto):
    texto = BeautifulSoup(texto, "html.parser").get_text()
    texto = re.sub(r"[^a-zA-Z\s]", "", texto)  # solo letras y espacios
    texto = re.sub(r"\s+", " ", texto)  # quita saltos de línea, espacios dobles
    return texto.lower().strip()


def tokenizar_texto(texto):
    if isinstance(texto, str):
        return texto.split()
    else:
        return []
# Tokenización básica por espacios

from nltk.corpus import stopwords

stop_words = set(stopwords.words("english"))

def eliminar_stopwords(tokens):
    return [token for token in tokens if token not in stop_words]

from tqdm import tqdm
tqdm.pandas()

def lematizar_texto(tokens, nlp):
    if not isinstance(tokens, list):
        return ""
    
    texto = " ".join(tokens)  # Convertir lista de tokens a string
    doc = nlp(texto)
    lemas = [token.lemma_ for token in doc]
    
    return lemas

In [None]:
### Ojo: estas funciones exigen un gasto computacional alto, tanto de recursos como de tiempo (tenerlo presente antes de correrlo)

df_en["clean_lyrics"] = df_en["lyrics"].progress_apply(limpiar_texto)
df_en["tokens"] = df_en["clean_lyrics"].progress_apply(tokenizar_texto)
df_en["tokens_filtrados"] = df_en["tokens"].progress_apply(eliminar_stopwords)
df_en["lematizado"] = df_en["tokens_filtrados"].progress_apply(lambda x: lematizar_texto(x, nlp_en))

In [None]:
df_en.to_csv("letras_final.csv", index=False)

### 2) Vector de emociones y embeddings
(la base de datos necesaria para realizar las recomendaciones, aún no esta completa, por ende en esta sección se incluye todo lo que se va agregando a la base y su tratamiento)

In [None]:
canciones = pd.read_csv("letras_final.csv")

In [None]:
from transformers import pipeline

classifier = pipeline(
    "text-classification",
    model="bhadresh-savani/distilbert-base-uncased-emotion",
    return_all_scores=True,
    framework="pt"  
)
def obtener_vector_emociones(texto):
    resultado = classifier(texto[:512])  # Limitamos a 512 tokens por eficiencia
    emociones = {item['label']: item['score'] for item in resultado[0]}
    return emociones


In [None]:
### Ojo: esta función exigen un gasto computacional alto y de memoria, tanto de recursos como de tiempo (tenerlo presente antes de correrlo)

tqdm.pandas()

canciones["vector_emociones"] = canciones["clean_lyrics"].progress_apply(obtener_vector_emociones)

In [None]:
# Convierte cada diccionario en un vector siguiendo el orden fijo
canciones["emociones_vector"] = canciones["vector_emociones"].apply(
    lambda d: np.array([d[e] for e in emociones_orden])
)

IMPORTANTE: las emociones dadas por el modelo preentrenado vienen en este orden = ["sadness", "joy", "love", "anger", "fear", "surprise"]

In [None]:
modelo_embeddings = SentenceTransformer('sentence-transformers/distilbert-base-nli-mean-tokens')

canciones['texto_lemmatizado'] = canciones['lematizado'].apply(lambda x: ' '.join(x))


tqdm.pandas() 
canciones['embeddings'] = canciones['texto_lemmatizado'].progress_apply(lambda x: modelo_embeddings.encode(x))

# print(len(canciones['embeddings'][0])) 

In [None]:
canciones.to_csv("canciones_con_embeddings_completas.csv")

### 3) Creación de la valencia textual

In [None]:
canciones = pd.read_csv("canciones_con_embeddings_completas.csv")

In [None]:
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
from tqdm import tqdm
import numpy as np
from typing import List, Tuple, Dict, Any

# --- Cargar modelo de sentimiento ---
MODEL_NAME = "cardiffnlp/twitter-roberta-base-sentiment-latest"

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
pipe = pipeline("text-classification", model=model, tokenizer=tokenizer, return_all_scores=True)

# --- Mapeo automático de etiquetas ---
id2label = model.config.id2label
label2id = model.config.label2id
label_names = [id2label[i].lower() for i in range(len(id2label))]

if set(label_names) == {"label_0", "label_1", "label_2"}:
    # Asumimos: 0=neg, 1=neu, 2=pos
    idx_neg, idx_neu, idx_pos = 0, 1, 2
else:
    idx_neg = label2id.get("negative", 0)
    idx_neu = label2id.get("neutral", 1)
    idx_pos = label2id.get("positive", 2)

def _triple_from_out(out: List[Dict[str, Any]]) -> Tuple[float, float, float]:
    """
    Convierte la salida del pipeline en un triple (neg, neu, pos).
    """
    idx2score = {item["label"]: item["score"] for item in out}
    def get_score(idx): 
        return list(idx2score.values())[idx] if isinstance(idx, int) else idx2score.get(idx, 0.0)
    return get_score(idx_neg), get_score(idx_neu), get_score(idx_pos)

def infer_and_compute_valence(
    texts: List[str],
    batch: int = 32
) -> Tuple[List[float], np.ndarray, List[Any]]:
    """
    Procesa una lista de textos y devuelve:
      - valences: lista de valores valence = p_pos - p_neg
      - probs: matriz Nx3 con (neg, neu, pos)
      - raw_out: salida cruda del pipeline
    """
    all_probs = []
    valences = []
    raw_outs = []

    for i in tqdm(range(0, len(texts), batch), desc="Procesando lotes"):
        batch_texts = texts[i:i + batch]
        outs = pipe(batch_texts, truncation=True, padding=True)

        if isinstance(outs, dict):
            outs = [outs]

        raw_outs.extend(outs)

        for out in outs:
            p_neg, p_neu, p_pos = _triple_from_out(out)
            all_probs.append([p_neg, p_neu, p_pos])
            valences.append(p_pos - p_neg)

    return valences, np.array(all_probs), raw_outs

In [None]:
texts = canciones["clean_lyrics"]
texts = list(map(str, texts))  # Asegura lista de strings
max_length = 512
texts = [text[:max_length] for text in texts]

valences, probs, info = infer_and_compute_valence(texts, batch=32)

# Crear DataFrame
df = pd.DataFrame({
    'text': texts,
    'valence': valences,
    'p_neg': probs[:, 0],
    'p_neu': probs[:, 1],
    'p_pos': probs[:, 2]
})

canciones['valence'] = df['valence']
canciones['p_neg'] = df['p_neg']
canciones['p_neu'] = df['p_neu']
canciones['p_pos'] = df['p_pos']

In [None]:
canciones.to_csv('canciones_con_valencia.csv')

### 4) Algunas correciones 

Para que la base de datos quedé ajustada fue necesario realizar algunas correciones 


 - Reañadimos valencia musical a la base de datos

In [None]:
canciones = pd.read_csv("canciones_con_valencia.csv")
canciones2 = pd.read_csv("canciones_ingles.csv")
canciones['valence_musical'] = canciones2['valence']

- Codificamos variables categóricas

In [None]:
import pandas as pd

# Diccionario para notas musicales → números (0 a 11)
key_map = {
    'C': 0, 'C#': 1, 'D': 2, 'D#': 3, 'E': 4, 'F': 5,
    'F#': 6, 'G': 7, 'G#': 8, 'A': 9, 'A#': 10, 'B': 11
}

def convert_key(x):
    if isinstance(x, str):
        # Si es texto que representa número
        if x.isdigit():
            return int(x)
        # Si es nota musical
        elif x in key_map:
            return key_map[x]
    return x  # Dejar valores ya numéricos como están

canciones['key'] = canciones['key'].apply(convert_key)

# Para 'mode', igual: manejar texto y números
mode_map = {'minor': 0, 'major': 1}
canciones['mode'] = canciones['mode'].apply(lambda x: mode_map[x.lower()] if isinstance(x, str) and x.lower() in mode_map else x)

In [None]:
canciones_br = canciones[['id', 'name', 'album_name','texto_lemmatizado','duration_ms','emociones_vector','valence', 'valence_musical']]
canciones_br.to_csv("canciones_br.csv")

### 5)  Recomendaciones pt1 (Fue prueba, no significo para la base de recomendaciones)


In [None]:
canciones_br = pd.read_csv('canciones_br.csv')
canciones = pd.read_csv("canciones_ingles.csv")
canciones_br['artists'] = canciones['artists']

In [None]:
import numpy as np
import pandas as pd

def parse_vector(v):
    if isinstance(v, str):
        # Quitar corchetes y espacios extra
        v = v.strip().replace('[', '').replace(']', '')
        # Separar por espacios (hay dobles espacios a veces)
        parts = v.split()
        return np.array([float(x) for x in parts], dtype=np.float32)
    elif isinstance(v, (list, tuple, np.ndarray)):
        return np.array(v, dtype=np.float32)
    else:
        return np.nan

# Ejemplo de uso en tu DataFrame
canciones_br['emociones_vector'] = canciones_br['emociones_vector'].apply(parse_vector)

In [None]:
# Parámetros (ajusta según tu máquina)
EMBED_COL = 'emociones_vector'   # columna con los vectores
ID_COL = 'id'                    # columna con el id de la canción
K = 5                            # queremos las 5 más cercanas
BATCH_CONVERT = 20000            # batch para convertir a numpy (evita picos de memoria)
BATCH_SEARCH = 20000             # batch para hacer búsquedas (evita picos)
USE_INDEXFLAT = True             # intenta IndexFlatIP primero; si falla, cae a IVF
INDEX_PATH = "faiss_emociones.index"
OUT_DF_PATH = "df_con_vecinos.parquet"

# --- 1) Asegúrate de tener el DataFrame 'df' cargado ---
# df = pd.read_parquet("...")  # o como lo tengas cargado

# ---- 2) Convertir la columna de vectores a numpy.ndarray dtype=float32 ----
def to_array_safe(x):
    # Si ya es array o list, convertir; si es string, eval() o parsearlo
    if isinstance(x, str):
        # intenta eval (seguro si el formato es "[0.1, 0.2, ...]" o "array([...])")
        try:
            arr = eval(x)
        except Exception:
            # como fallback, intentar remover corchetes y split
            s = x.strip().lstrip('[').rstrip(']')
            parts = [p for p in s.split() if p not in [',']]
            arr = [float(p.replace(',','')) for p in parts]
        return np.array(arr, dtype=np.float32)
    elif isinstance(x, (list, tuple, np.ndarray)):
        return np.array(x, dtype=np.float32)
    else:
        # si hay NaN u otro, devolver vector nan (se manejará)
        return np.array([], dtype=np.float32)

In [None]:
# Convertir en batches para no explotar RAM
n = len(canciones_br)
sample_vec = None
vectors_list = []
idx_chunks = list(range(0, n, BATCH_CONVERT))

for start in tqdm(idx_chunks, desc="Convirtiendo vectors a numpy (batches)"):
    end = min(start + BATCH_CONVERT, n)
    chunk = canciones_br[EMBED_COL].iloc[start:end].values
    converted = []
    for v in chunk:
        arr = to_array_safe(v)
        converted.append(arr)
    # Verificar dimensión y pad / validar
    for i, arr in enumerate(converted):
        if arr.size == 0:
            raise ValueError(f"Vector vacío en fila {start + i}. Revisa la columna {EMBED_COL}.")
        if sample_vec is None:
            sample_vec = arr
            dim = arr.shape[0]
        else:
            if arr.shape[0] != dim:
                raise ValueError(f"Dimensión inconsistente en fila {start + i}: {arr.shape[0]} vs {dim}")
    vectors_list.append(np.vstack(converted))

In [None]:
# Concatenar (esto produce la matriz completa; si no entra en RAM habría que procesar con índice incremental)
vectors = np.vstack(vectors_list).astype(np.float32)  # shape (n, d)
del vectors_list

# ---- 3) Normalizar vectores (para usar inner product como coseno) ----
vectors = normalize(vectors, axis=1).astype(np.float32)

d = vectors.shape[1]
print(f"Num vectores: {vectors.shape[0]}, Dimensión: {d}")

In [None]:
# ---- 4) Crear índice FAISS ----
index = None
try:
    if USE_INDEXFLAT:
        print("Creando IndexFlatIP (exacto, inner product)...")
        index = faiss.IndexFlatIP(d)   # inner product; con vectores normalizados => coseno
        # Añadir por batches (si quieres evitar copia grande)
        for start in tqdm(range(0, n, BATCH_CONVERT), desc="Añadiendo vectores al índice (batches)"):
            end = min(start + BATCH_CONVERT, n)
            index.add(vectors[start:end])
    else:
        raise MemoryError("Forzando fallback a IVF")
except Exception as e:
    print("IndexFlatIP falló o lo forzamos. Usaremos IndexIVFFlat (requiere training).")
    # Fallback: IndexIVFFlat
    nlist = int(math.sqrt(n))  # regla práctica: sqrt(n) centroids
    quantizer = faiss.IndexFlatIP(d)
    index = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_INNER_PRODUCT)
    # Entrenar con una muestra aleatoria
    rng = np.random.default_rng(seed=42)
    sample_size = min(100000, n)
    sample_idx = rng.choice(n, sample_size, replace=False)
    print(f"Entrenando IVF con {sample_size} vectores...")
    index.train(vectors[sample_idx])
    # Añadir en batches
    for start in tqdm(range(0, n, BATCH_CONVERT), desc="Añadiendo vectores al índice IVF (batches)"):
        end = min(start + BATCH_CONVERT, n)
        index.add(vectors[start:end])

In [None]:
# ---- 5) Buscar k+1 vecinos (para luego quitar la propia canción) en batches con tqdm ----
k_search = K + 1
all_indices = np.empty((0, k_search), dtype=np.int64)
all_distances = np.empty((0, k_search), dtype=np.float32)

for start in tqdm(range(0, n, BATCH_SEARCH), desc="Buscando vecinos (batches)"):
    end = min(start + BATCH_SEARCH, n)
    q = vectors[start:end]
    distances_batch, indices_batch = index.search(q, k_search)   # distances: inner product (similitud)
    all_indices = np.vstack((all_indices, indices_batch))
    all_distances = np.vstack((all_distances, distances_batch))

# ---- 6) Para cada fila, quitar el propio índice y quedarnos con K vecinos ----
# indices retornan indices en el orden del DataFrame (0..n-1)
nearest_indices = np.zeros((n, K), dtype=np.int64)
nearest_sim = np.zeros((n, K), dtype=np.float32)

for i in range(n):
    inds = all_indices[i]
    sims = all_distances[i]
    # quitar el mismo elemento (puede no ser el primero si hay exact duplicates)
    mask = inds != i
    inds_filtered = inds[mask]
    sims_filtered = sims[mask]
    # si por alguna razón quedan menos de K, rellenar con -1 / 0
    if len(inds_filtered) < K:
        # rellenar
        pad = K - len(inds_filtered)
        inds_filtered = np.concatenate([inds_filtered, np.full(pad, -1, dtype=np.int64)])
        sims_filtered = np.concatenate([sims_filtered, np.zeros(pad, dtype=np.float32)])
    nearest_indices[i, :] = inds_filtered[:K]
    nearest_sim[i, :] = sims_filtered[:K]

In [None]:
# ---- 7) Mapear índices a IDs reales y crear columnas en el DataFrame ----
ids_array = canciones_br[ID_COL].values

for k in range(K):
    col_id = f'cancion_mas_cercana#{k+1}'
    col_sim = f'similitud_coseno#{k+1}'
    col_dist = f'distancia_coseno#{k+1}'

    neighbor_ids_vec = [
        ids_array[i] if (i != -1 and i < len(ids_array)) else None
        for i in nearest_indices[:, k]
    ]
    neighbor_sims_vec = [
        float(s) if not np.isnan(s) else 0.0
        for s in nearest_sim[:, k]
    ]
    neighbor_dists_vec = [1.0 - s for s in neighbor_sims_vec]

    canciones_br[col_id] = neighbor_ids_vec
    canciones_br[col_sim] = neighbor_sims_vec
    canciones_br[col_dist] = neighbor_dists_vec

# ---- 8) Guardar resultados e índice ----
import json
import pandas as pd

# Convertir emociones_vector a string JSON
if isinstance(canciones_br['emociones_vector'].iloc[0], (list, np.ndarray)):
    canciones_br['emociones_vector'] = canciones_br['emociones_vector'].apply(
        lambda x: json.dumps(x.tolist() if isinstance(x, np.ndarray) else x)
    )

# Convertir columnas tipo Period a string (solución al ArrowKeyError)
for col in canciones_br.columns:
    if pd.api.types.is_period_dtype(canciones_br[col]):
        canciones_br[col] = canciones_br[col].astype(str)

# Convertir columnas de vecinos si contienen objetos raros
for col in canciones_br.columns:
    if any(keyword in col for keyword in ['cancion_mas_cercana', 'similitud_coseno', 'distancia_coseno']):
        canciones_br[col] = canciones_br[col].apply(
            lambda x: x if isinstance(x, (float, int, str)) or x is None else str(x)
        )

print("Guardando índice FAISS y DataFrame resultante...")
faiss.write_index(index, INDEX_PATH)
canciones_br.to_parquet(OUT_DF_PATH, index=False, engine="fastparquet")

print("Listo. Columnas añadidas:")
print([c for c in canciones_br.columns if 'cancion_mas_cercana' in c or 'distancia_coseno' in c or 'similitud_coseno' in c])

In [None]:
canciones_br = canciones_br.drop(columns=[col for col in canciones_br.columns if col.startswith('distancia')])

In [None]:
# Conversión robusta de la columna 'emociones_vector' a una matriz numpy homogénea
def parse_vector(v):
    if isinstance(v, str):
        # Si la fila es string, la convertimos a lista
        return np.fromstring(v.strip("[]"), sep=' ', dtype='float32')
    elif isinstance(v, list):
        return np.array(v, dtype='float32')
    elif isinstance(v, np.ndarray):
        return v.astype('float32')
    else:
        return np.zeros(6, dtype='float32')  # fallback

# Aplicamos la conversión a todas las filas (con tqdm para progreso)
X_embeddings = np.vstack([parse_vector(v) for v in tqdm(canciones_br['emociones_vector'].values)])

print(f"Dimensión final: {X_embeddings.shape}")  # debería ser (709497, 6)


In [None]:
# 2. Creamos el índice FAISS
index = faiss.IndexFlatIP(X_embeddings.shape[1])  # IP = inner product (para coseno)
faiss.normalize_L2(X_embeddings)
index.add(X_embeddings)

# 3. Definimos los umbrales de diferencia
umbral_valence = 0.1          # puedes ajustarlo
umbral_valence_musical = 0.1  # puedes ajustarlo

# 4. Para cada canción, encontramos las más similares considerando los filtros
top_k = 100  # candidatos por FAISS antes de filtrar
resultados = []

for i in tqdm(range(len(canciones_br)), desc="Calculando similitudes filtradas"):
    # Embedding de la canción actual
    query = X_embeddings[i].reshape(1, -1)
    
    # Buscar vecinos más cercanos
    D, I = index.search(query, top_k)

    # Obtener valores de referencia
    val_ref = canciones_br.loc[i, "valence"]
    val_mus_ref = canciones_br.loc[i, "valence_musical"]

    # Filtrar por diferencias en valence y valence_musical
    candidatos = []
    for idx, sim in zip(I[0], D[0]):
        if idx == i:  # evitar la misma canción
            continue
        val = canciones_br.loc[idx, "valence"]
        val_mus = canciones_br.loc[idx, "valence_musical"]

        if (abs(val - val_ref) <= umbral_valence) and \
           (abs(val_mus - val_mus_ref) <= umbral_valence_musical):
            candidatos.append((idx, sim))

    # Ordenar por similitud descendente
    candidatos = sorted(candidatos, key=lambda x: x[1], reverse=True)

    resultados.append({
        "song_index": i,
        "similares_filtrados": [idx for idx, _ in candidatos]
    })

In [None]:
canciones_br['canciones_similares_filtradas'] = [r['similares_filtrados'] for r in resultados]
canciones_br.to_csv("canciones_filtradas.csv", index=False)

### 6) Añadir emociones mezcladas para posteriores usos en la recomendación

In [None]:
canciones_a_recomendar["emociones_vector"] = canciones_a_recomendar["emociones_vector"].apply(
    lambda x: np.fromstring(x.strip("[]"), sep=" ")
)

In [None]:
# Orden de emociones
emociones_orden = ["sadness", "joy", "love", "anger", "fear", "surprise"]

# Diccionario de etiquetas
mapping = {
    # --- singles ---
    ("anger",): "ira",
    ("fear",): "miedo",
    ("joy",): "alegría",
    ("love",): "amor",
    ("sadness",): "tristeza",
    ("surprise",): "sorpresa",

    # --- pares ---
    ("anger", "fear"): "frustración",
    ("anger", "joy"): "ironía alegre",
    ("anger", "love"): "pasión conflictiva",
    ("anger", "sadness"): "amargura",
    ("anger", "surprise"): "ira súbita",
    ("fear", "joy"): "euforia ansiosa",
    ("fear", "love"): "amor inseguro",
    ("fear", "sadness"): "desesperanza",
    ("fear", "surprise"): "miedo repentino",
    ("joy", "love"): "felicidad romántica",
    ("joy", "sadness"): "alegría agridulce",
    ("joy", "surprise"): "sorpresa alegre",
    ("love", "sadness"): "desamor",
    ("love", "surprise"): "amor sorprendente",
    ("sadness", "surprise"): "desconcierto triste",

    # --- tríos ---
    ("anger", "fear", "joy"): "ansiedad exaltada",
    ("anger", "fear", "love"): "amor tormentoso",
    ("anger", "fear", "sadness"): "desesperación furiosa",
    ("anger", "fear", "surprise"): "pánico violento",
    ("anger", "joy", "love"): "pasión eufórica",
    ("anger", "joy", "sadness"): "contraste emocional",
    ("anger", "joy", "surprise"): "explosión emocional",
    ("anger", "love", "sadness"): "relación tóxica",
    ("anger", "love", "surprise"): "pasión inesperada",
    ("anger", "sadness", "surprise"): "tristeza explosiva",
    ("fear", "joy", "love"): "amor ansioso",
    ("fear", "joy", "sadness"): "esperanza frágil",
    ("fear", "joy", "surprise"): "alegría nerviosa",
    ("fear", "love", "sadness"): "amor frágil",
    ("fear", "love", "surprise"): "amor sorpresivo",
    ("fear", "sadness", "surprise"): "angustia sorpresiva",
    ("joy", "love", "sadness"): "nostalgia romántica",
    ("joy", "love", "surprise"): "felicidad inesperada",
    ("joy", "sadness", "surprise"): "melancolía alegre",
    ("love", "sadness", "surprise"): "desamor inesperado",
    # Los demás tríos se cubrirán con fallback automático
}

def classify_vector(vec, threshold=0.10, max_k=3):
    """
    vec: np.array con 6 probabilidades en orden de emociones_orden
    """
    prob_dict = dict(zip(emociones_orden, vec))

    # 1) Filtrar ≥ threshold
    selected = [(emo, p) for emo, p in prob_dict.items() if p >= threshold]

    if not selected:  
        # Fallback → top1
        top1 = max(prob_dict.items(), key=lambda x: x[1])[0]
        key = (top1,)
    else:
        # Ordenar por prob desc y tomar hasta max_k
        selected_sorted = sorted(selected, key=lambda x: x[1], reverse=True)[:max_k]
        emos = [e for e,_ in selected_sorted]
        key = tuple(sorted(emos))  # canonicalizar (orden alfabético)

    # Buscar etiqueta en mapping
    label = mapping.get(key, " + ".join(key))  # fallback automático
    return label

# Aplicar al dataframe
canciones_a_recomendar["emocion_label"] = canciones_a_recomendar["emociones_vector"].apply(
    lambda v: classify_vector(np.array(eval(v)) if isinstance(v, str) else np.array(v))
)


In [None]:
canciones_a_recomendar.to_csv('canciones_filtradas.csv', index=False)

### 7) Creación de la base de recomendaciones

In [None]:
canciones_a_recomendar = pd.read_csv("canciones_flitradas.csv")

In [None]:
df = canciones_a_recomendar.copy()

# Si la columna es texto tipo "[0.63 0.01 ...]" la convertimos a lista
def parse_vector(x):
    if isinstance(x, str):
        # Quitar corchetes y convertir a floats
        x = x.strip("[]").split()
        return np.array(x, dtype=np.float32)
    elif isinstance(x, (list, np.ndarray)):
        return np.array(x, dtype=np.float32)
    else:
        return np.array([], dtype=np.float32)  # Por si hay nulos

df['emociones_vector'] = df['emociones_vector'].apply(parse_vector)

In [None]:
w_emociones = 0.7
w_valence = 0.2
w_valence_musical = 0.1

# Conversión robusta de 'emociones_vector'
def parse_vector(x):
    if isinstance(x, str):
        return np.fromstring(x.strip("[]"), sep=" ", dtype=np.float32)
    elif isinstance(x, (list, np.ndarray)):
        return np.array(x, dtype=np.float32)
    else:
        return np.array([], dtype=np.float32)

df['emociones_vector'] = df['emociones_vector'].apply(parse_vector)

# Verificar un ejemplo
print("Ejemplo convertido:", df['emociones_vector'].iloc[0])

# Construcción de la matriz final
X_emociones = np.vstack(df['emociones_vector'].values).astype('float32')
valence = df['valence'].astype('float32').values[:, None]
valence_musical = df['valence_musical'].astype('float32').values[:, None]

X_final = np.hstack([
    X_emociones * w_emociones,
    valence * w_valence,
    valence_musical * w_valence_musical
]).astype('float32')

# Normalizamos
faiss.normalize_L2(X_final)

# Creamos índice FAISS
index = faiss.IndexFlatIP(X_final.shape[1])
index.add(X_final)

# Número de vecinos (10 + 1 porque el primero es la misma canción)
k = 10 + 1  

# Buscamos los vecinos
distances, indices = index.search(X_final, k)

# Generar DataFrame de recomendaciones
recommendations = []
for i in range(len(df)):
    recs = [(df.iloc[idx]['id'], float(dist)) for idx, dist in zip(indices[i][1:], distances[i][1:])]
    recommendations.append(recs)

rec_df = pd.DataFrame({
    'id': df['id'],
    'name': df['name'],
    **{f'recom_{j+1}': [recs[j] for recs in recommendations] for j in range(10)}
})

print(rec_df.head())

In [None]:
rec_df.to_csv("rec_df.csv")