In [None]:
import pandas as pd
import numpy as np
import re
import nltk
from nltk.corpus import stopwords
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics.pairwise import cosine_similarity
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import load_model
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, GRU, Dense, Dropout, Bidirectional, Concatenate, Layer
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow as tf
import pickle
with open("tokenizer.pkl", "rb") as f:
    tokenizer = pickle.load(f)


class AttentionLayer(Layer):
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.Wa = self.add_weight(name='Wa', shape=(input_shape[-1], input_shape[-1]), initializer='glorot_uniform', trainable=True)
        self.ba = self.add_weight(name='ba', shape=(input_shape[-1],), initializer='zeros', trainable=True)
        self.va = self.add_weight(name='va', shape=(input_shape[-1], 1), initializer='glorot_uniform', trainable=True)
        super(AttentionLayer, self).build(input_shape)

    def call(self, inputs):
        uit = tf.tanh(tf.tensordot(inputs, self.Wa, axes=1) + self.ba)
        ait = tf.nn.softmax(tf.tensordot(uit, self.va, axes=1), axis=1)
        weighted_input = inputs * ait
        return tf.reduce_sum(weighted_input, axis=1)

    def get_config(self):
        config = super(AttentionLayer, self).get_config()
        return config


# --- Load & preprocess full dataset ---
nltk.download('stopwords')
df = pd.read_csv("msd_dataset_enriched_with_similar_songs.csv")
df = df[~df['lyrics'].isna() & ~df['lyrics'].str.strip().eq("")]
stopwords_set = set(stopwords.words('english'))

# Clean lyrics
def clean_lyrics(text):
    text = re.sub(r"[^a-zA-Z\s]", "", text.lower())
    tokens = text.split()
    return ' '.join([t for t in tokens if t not in stopwords_set])

df['cleaned_lyrics'] = df['lyrics'].apply(clean_lyrics)

# Define meta features
meta_features = ['duration','tempo','key','loudness'] + [
    f'pitch_mean_{i}' for i in range(12)
] + [f'pitch_std_{i}' for i in range(12)] + [f'timbre_mean_{i}' for i in range(12)] + [f'timbre_std_{i}' for i in range(12)]

# Fill NA and standardize meta features
scaler = StandardScaler()
df[meta_features] = df[meta_features].fillna(0)
X_meta_all = scaler.fit_transform(df[meta_features])


# Tokenize lyrics
# tokenizer = Tokenizer()
# tokenizer.fit_on_texts(df['cleaned_lyrics'])
X_lyrics_all = pad_sequences(tokenizer.texts_to_sequences(df['cleaned_lyrics']), maxlen=100)

# Load trained genre classifier model
best_model = load_model("gru_atten_model.h5", custom_objects={'AttentionLayer': AttentionLayer})
  # assumes model is saved here
label_encoder = LabelEncoder()
# label_encoder.fit(df[df['genre'].notna()]['genre'])
genre_labels = df[df['genre'].notna()]['genre']
if len(genre_labels) == 0:
    raise ValueError("No known genre labels available to train label encoder.")
label_encoder.fit(genre_labels)


# Predict genres for unlabeled data
df['predicted_genre'] = df['genre']
unlabeled_mask = df['genre'].isna()
y_pred = np.argmax(best_model.predict([X_lyrics_all[unlabeled_mask], X_meta_all[unlabeled_mask]]), axis=1)
df.loc[unlabeled_mask, 'predicted_genre'] = label_encoder.inverse_transform(y_pred)

# Build cosine similarity matrix
genre_vectors = X_meta_all
cos_sim_matrix = cosine_similarity(genre_vectors)

# Recommend top 5 songs based on genre and similarity
def recommend_similar_songs(song_index, top_k=5):
    
    target_genre = df.iloc[song_index]['predicted_genre']
    target_similarities = cos_sim_matrix[song_index]

    same_genre_mask = df['predicted_genre'] == target_genre
    target_similarities[~same_genre_mask] = -1  # mask different genres

    top_indices = np.argsort(target_similarities)[::-1][1:top_k+1]  # skip self
    return df.iloc[top_indices][['track_id', 'title', 'artist_name', 'predicted_genre', 'top_5_similar_songs']]

# Example
index = 123  # sample index to test
print("\n🎵 Recommendation for:", df.iloc[index]['title'])
print(recommend_similar_songs(index))


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/Claudia/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 32ms/step

🎵 Recommendation for: Cien Anos
                track_id                            title   artist_name  \
5502  TRAXXGI128F14688C7  Paper Tigers (acoustic version)  Tom Cochrane   
1200  TRAGROY128F92D2C53    This Kind Of Love (Full Band)  Sister Hazel   
9969  TRBBLFA12903CDC496                    I'll Be Yours   Faron Young   
5608  TRAEUVH128F422559A                   Hasta El Final  La Portuaria   
1850  TRATNHC12903CFA949                          Hey Joe    Carl Smith   

     predicted_genre                                top_5_similar_songs  
5502       Blues Rap  TRAXWQU128F42716E2; TRBBHPQ128F145A7F8; TRAHKR...  
1200       Blues Rap  TRBHNCW128F9334F3D; TRAAJJG128F4284B27; TRAYLR...  
9969       Blues Rap  TRAOXWF128E0790985; TRADCBA128F92E7161; TRBHFD...  
5608       Blues Rap  TRAGKAC128F4225537; TRACYOR128F427FB1D; TRAGMX...  
1850       Blues Rap  TRAUREN128F931F5AB; TRBBLFA12903CDC496; TRATES... 

In [40]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# Load dataset
df = pd.read_csv("msd_dataset_enriched_with_similar_songs.csv")

# Clean any missing values
df = df[~df['track_id'].isna()]
df = df[~df['top_5_similar_songs'].isna()]

# Normalize and split similar song lists
def extract_similar_ids(sim_str):
    return [s.strip() for s in sim_str.split(";") if s.strip()]

# Build a set of all referenced song IDs
referenced_ids = set()
df['similar_ids'] = df['top_5_similar_songs'].apply(extract_similar_ids)
for ids in df['similar_ids']:
    referenced_ids.update(ids)

# Identify songs that are *not* referenced as similar
df['is_referenced'] = df['track_id'].apply(lambda tid: tid in referenced_ids)

# Try to put less referenced songs into the test set
not_referenced_df = df[~df['is_referenced']]
referenced_df = df[df['is_referenced']]

# Calculate 30% of total rows for test set
test_size = int(0.3 * len(df))

# Start with as many non-referenced songs as possible in the test set
n_test_from_not_referenced = min(test_size, len(not_referenced_df))
test_df = not_referenced_df.sample(n=n_test_from_not_referenced, random_state=42)

# Fill the remaining test slots from the referenced ones
remaining_test_size = test_size - len(test_df)
if remaining_test_size > 0:
    test_df = pd.concat([
        test_df,
        referenced_df.sample(n=remaining_test_size, random_state=42)
    ])

# Remaining data is training set
train_df = df.drop(test_df.index)

# Optional: check overlap
overlap = set(test_df['track_id']) & referenced_ids
print(f"Number of test songs that appear as similar songs: {len(overlap)}")

# Save to CSV
train_df.to_csv("train_set.csv", index=False)
test_df.to_csv("test_set.csv", index=False)


Number of test songs that appear as similar songs: 0


In [45]:
# Supervised Song Similarity Recommender
import pandas as pd
import numpy as np
import re
import random
import nltk
from nltk.corpus import stopwords
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense, Dropout, Concatenate, GRU, Embedding, Bidirectional, Layer
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.metrics.pairwise import cosine_similarity
import tensorflow as tf
import ast
import pickle
from tqdm import tqdm

nltk.download('stopwords')
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

# Attention Layer
class AttentionLayer(Layer):
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.Wa = self.add_weight(name='Wa', shape=(input_shape[-1], input_shape[-1]), initializer='glorot_uniform', trainable=True)
        self.ba = self.add_weight(name='ba', shape=(input_shape[-1],), initializer='zeros', trainable=True)
        self.va = self.add_weight(name='va', shape=(input_shape[-1], 1), initializer='glorot_uniform', trainable=True)
        super(AttentionLayer, self).build(input_shape)

    def call(self, inputs):
        uit = tf.tanh(tf.tensordot(inputs, self.Wa, axes=1) + self.ba)
        ait = tf.nn.softmax(tf.tensordot(uit, self.va, axes=1), axis=1)
        weighted_input = inputs * ait
        return tf.reduce_sum(weighted_input, axis=1)

    def get_config(self):
        config = super(AttentionLayer, self).get_config()
        return config

# --- Load Data ---
# df = pd.read_csv("msd_dataset_enriched_with_similar_songs.csv")
df = pd.read_csv("train_set.csv")
df = df[~df['lyrics'].isna() & df['lyrics'].str.strip().ne("")]
df['top_5_similar_songs'] = df['top_5_similar_songs'].apply(
    lambda x: [s.strip() for s in x.split(';')] if isinstance(x, str) and ';' in x else []
)

# --- Preprocessing ---
stopwords_set = set(stopwords.words('english'))
def clean_text(text):
    text = re.sub(r"[^a-zA-Z\s]", "", str(text).lower())
    return ' '.join(word for word in text.split() if word not in stopwords_set)

df['cleaned_lyrics'] = df['lyrics'].apply(clean_text)
df['cleaned_title'] = df['title'].apply(clean_text)
df['cleaned_tags'] = df['artist_tags'].apply(clean_text)

# --- Predict missing genres using pretrained model ---
with open("tokenizer.pkl", "rb") as f:
    tokenizer = pickle.load(f)

X_lyrics_all = pad_sequences(tokenizer.texts_to_sequences(df['cleaned_lyrics']), maxlen=100)

# Only meta features used by pretrained genre model
genre_meta_features = ['duration','tempo','key','loudness'] + [
    f'pitch_mean_{i}' for i in range(12)
] + [f'pitch_std_{i}' for i in range(12)] + [f'timbre_mean_{i}' for i in range(12)] + [f'timbre_std_{i}' for i in range(12)]
df[genre_meta_features] = df[genre_meta_features].fillna(0)
meta_scaler = StandardScaler()
X_meta_all = meta_scaler.fit_transform(df[genre_meta_features])

best_model = load_model("gru_atten_model.h5", custom_objects={'AttentionLayer': AttentionLayer})
label_encoder = LabelEncoder()
label_encoder.fit(df[df['genre'].notna()]['genre'])

unlabeled_mask = df['genre'].isna()
y_pred = np.argmax(best_model.predict([X_lyrics_all[unlabeled_mask], X_meta_all[unlabeled_mask]]), axis=1)
classes = label_encoder.classes_
safe_y_pred = [classes[i] if i < len(classes) else "Unknown" for i in y_pred]
df.loc[unlabeled_mask, 'predicted_genre'] = safe_y_pred

# --- Use all relevant features for similarity model ---
exclude_cols = {'track_id', 'artist_name', 'artist_id', 'top_5_similar_songs', 'genre', 'lyrics', 'song_id', 'release', 'title', 'artist_tags'}
categorical_cols = ['predicted_genre']

all_feature_cols = [col for col in df.columns if col not in exclude_cols and col not in categorical_cols]

# One-hot encode predicted genre
df['predicted_genre'] = df['predicted_genre'].astype(str)
ohe = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
genre_encoded = ohe.fit_transform(df[['predicted_genre']])

# Scale only numeric features
feature_data = df[all_feature_cols].select_dtypes(include=[np.number]).fillna(0)
scaler = StandardScaler()
feature_scaled = scaler.fit_transform(feature_data)
X_meta = np.hstack([feature_scaled, genre_encoded])

# Lyrics input
X_lyrics = pad_sequences(tokenizer.texts_to_sequences(df['cleaned_lyrics']), maxlen=100)
X_title = pad_sequences(tokenizer.texts_to_sequences(df['cleaned_title']), maxlen=20)
X_tags = pad_sequences(tokenizer.texts_to_sequences(df['cleaned_tags']), maxlen=30)

# --- Build song embeddings (lyrics + meta + title + tags) ---
embedding_dim = 50
vocab_size = len(tokenizer.word_index) + 1

lyrics_input = Input(shape=(100,), name='lyrics_input')
lyrics_embed = Embedding(input_dim=vocab_size, output_dim=embedding_dim, trainable=True)(lyrics_input)
lyrics_output = Bidirectional(GRU(64))(lyrics_embed)

meta_input = Input(shape=(X_meta.shape[1],), name='meta_input')
meta_output = Dense(64, activation='relu')(meta_input)

title_input = Input(shape=(20,), name='title_input')
title_embed = Embedding(input_dim=vocab_size, output_dim=embedding_dim, trainable=True)(title_input)
title_output = Bidirectional(GRU(32))(title_embed)

tags_input = Input(shape=(30,), name='tags_input')
tags_embed = Embedding(input_dim=vocab_size, output_dim=embedding_dim, trainable=True)(tags_input)
tags_output = Bidirectional(GRU(32))(tags_embed)

combined = Concatenate()([lyrics_output, meta_output, title_output, tags_output])
embedding_model = Model(inputs=[lyrics_input, meta_input, title_input, tags_input], outputs=combined)

print("Generating song embeddings...")
song_embeddings = embedding_model.predict([X_lyrics, X_meta, X_title, X_tags], batch_size=64, verbose=1)
print("Embeddings generated:", song_embeddings.shape)

# --- Build pairs ---
def build_pairs(df, top_k=5, neg_ratio=2):
    print("Building positive and negative pairs...")
    pos_pairs, neg_pairs = [], []
    track_to_index = {tid: i for i, tid in enumerate(df['track_id'])}
    for i, row in tqdm(df.iterrows(), total=len(df)):
        anchor = i
        pos_added = 0
        for sim_id in row['top_5_similar_songs']:
            if sim_id in track_to_index:
                sim_idx = track_to_index[sim_id]
                if sim_idx < len(df):
                    pos_pairs.append((anchor, sim_idx, 1))
                    pos_added += 1

        if pos_added == 0:
            continue

        all_indices = list(range(len(df)))
        valid_negatives = [j for j in all_indices if j != anchor and df.iloc[j]['track_id'] not in row['top_5_similar_songs']]
        num_neg = min(neg_ratio * pos_added, len(valid_negatives))
        negatives = random.sample(valid_negatives, k=num_neg)
        for neg in negatives:
            if neg < len(df):
                neg_pairs.append((anchor, neg, 0))
    print("Done building pairs.")
    return pos_pairs + neg_pairs

pairs = build_pairs(df)

# --- Create training data ---
X_pair = []
y_pair = []

for anchor_idx, compare_idx, label in pairs:
    if anchor_idx >= len(song_embeddings) or compare_idx >= len(song_embeddings):
        continue  # skip out-of-bounds indices
    anchor_vec = song_embeddings[anchor_idx]
    compare_vec = song_embeddings[compare_idx]

    features = np.concatenate([anchor_vec, compare_vec, np.abs(anchor_vec - compare_vec)])
    X_pair.append(features)
    y_pair.append(label)

X_pair = np.array(X_pair)
y_pair = np.array(y_pair)
print("Number of training pairs created:", len(X_pair))

# --- Similarity Classifier ---
sim_input = Input(shape=(X_pair.shape[1],))
z = Dense(128, activation='relu')(sim_input)
z = Dropout(0.3)(z)
z = Dense(64, activation='relu')(z)
out = Dense(1, activation='sigmoid')(z)

sim_model = Model(sim_input, out)
sim_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

print("Starting training of similarity model...")
sim_model.fit(X_pair, y_pair, batch_size=64, epochs=10, validation_split=0.2, verbose=1,
              callbacks=[EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True)])
print("Training complete.")



# --- Save models and preprocessing objects ---
embedding_model.save("embedding_model.keras")
sim_model.save("similarity_model.keras")

with open("tokenizer.pkl", "wb") as f:
    pickle.dump(tokenizer, f)
with open("scaler.pkl", "wb") as f:
    pickle.dump(scaler, f)
with open("meta_scaler.pkl", "wb") as f:
    pickle.dump(meta_scaler, f)
with open("ohe.pkl", "wb") as f:
    pickle.dump(ohe, f)
with open("label_encoder.pkl", "wb") as f:
    pickle.dump(label_encoder, f)
np.save("song_embeddings.npy", song_embeddings)


def preprocess_test_df(df_test):
    print("Preprocessing test set...")
    df_test = df_test[~df_test['lyrics'].isna() & df_test['lyrics'].str.strip().ne("")].copy()

    # Clean and transform
    for col in ['lyrics', 'title', 'artist_tags']:
        df_test[f'cleaned_{col}'] = df_test[col].apply(clean_text)

    X_lyrics = pad_sequences(tokenizer.texts_to_sequences(df_test['cleaned_lyrics']), maxlen=100)
    X_title = pad_sequences(tokenizer.texts_to_sequences(df_test['cleaned_title']), maxlen=20)
    X_tags = pad_sequences(tokenizer.texts_to_sequences(df_test['cleaned_tags']), maxlen=30)

    feature_data = df_test[all_feature_cols].select_dtypes(include=[np.number]).fillna(0)
    feature_scaled = scaler.transform(feature_data)

    # Fill missing genres
    df_test['predicted_genre'] = df_test['genre']
    unlabeled_mask = df_test['genre'].isna()
    if unlabeled_mask.sum() > 0:
        X_meta_test_for_genre = meta_scaler.transform(df_test[genre_meta_features].fillna(0))
        X_lyrics_for_genre = pad_sequences(tokenizer.texts_to_sequences(df_test['cleaned_lyrics']), maxlen=100)
        y_pred_test = np.argmax(best_model.predict([X_lyrics_for_genre[unlabeled_mask], X_meta_test_for_genre[unlabeled_mask]]), axis=1)
        df_test.loc[unlabeled_mask, 'predicted_genre'] = [label_encoder.classes_[i] if i < len(label_encoder.classes_) else "Unknown" for i in y_pred_test]

    genre_ohe = ohe.transform(df_test[['predicted_genre']].astype(str))
    X_meta = np.hstack([feature_scaled, genre_ohe])

    return df_test, [X_lyrics, X_meta, X_title, X_tags]

def recommend_similar_from_training(song_index, test_embeddings, top_k=5):
    target_vec = test_embeddings[song_index]
    scores = []
    for i in tqdm(range(len(song_embeddings)), desc="Scoring similarities"):
        comp_vec = song_embeddings[i]
        pair_vec = np.concatenate([target_vec, comp_vec, np.abs(target_vec - comp_vec)])
        score = sim_model.predict(np.expand_dims(pair_vec, axis=0), verbose=0)[0][0]
        scores.append((i, score))
    top_matches = sorted(scores, key=lambda x: x[1], reverse=True)[:top_k]
    return df.iloc[[i for i, _ in top_matches]][['track_id', 'title', 'artist_name']]


df_test = pd.read_csv("test_set.csv")
df_test, test_inputs = preprocess_test_df(df_test)

print("\n🎵 Test Recommendation for:", df_test.iloc[0]['title'])
print(recommend_similar_from_training(0, embedding_model.predict(test_inputs)))




[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/Claudia/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 28ms/step
Generating song embeddings...
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 58ms/step
Embeddings generated: (1716, 320)
Building positive and negative pairs...


100%|██████████| 1716/1716 [03:23<00:00,  8.45it/s]

Done building pairs.
Number of training pairs created: 3042
Starting training of similarity model...
Epoch 1/10





[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 8ms/step - accuracy: 0.5615 - loss: 0.6879 - val_accuracy: 0.9261 - val_loss: 0.4707
Epoch 2/10
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.6159 - loss: 0.6487 - val_accuracy: 0.6634 - val_loss: 0.6128
Epoch 3/10
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.6458 - loss: 0.6259 - val_accuracy: 0.6716 - val_loss: 0.6124
Training complete.
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step

🎵 Test Recommendation for: C'est pas ma faute
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
                track_id                            title          artist_name
5817  TRBAKRM128F4248228        The End. [Live In Mexico]  My Chemical Romance
2211  TRAFLZY128F148959F  House Of Wolves (Album Version)  My Chemical Romance
1574  TRASUJK128E0789C12            Speed Of Sound (Live)             Coldplay
4

In [56]:
# Supervised Song Similarity Recommender
import pandas as pd
import numpy as np
import re
import random
import nltk
from nltk.corpus import stopwords
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense, Dropout, Concatenate, GRU, Embedding, Bidirectional, Layer
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.metrics.pairwise import cosine_similarity
import tensorflow as tf
import ast
import pickle
from tqdm import tqdm
import os

nltk.download('stopwords')
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

# --- Load saved components ---
embedding_model = load_model("embedding_model.keras")
sim_model = load_model("similarity_model.keras")
song_embeddings = np.load("song_embeddings.npy")

with open("tokenizer.pkl", "rb") as f:
    tokenizer = pickle.load(f)
with open("scaler.pkl", "rb") as f:
    scaler = pickle.load(f)
with open("meta_scaler.pkl", "rb") as f:
    meta_scaler = pickle.load(f)
with open("ohe.pkl", "rb") as f:
    ohe = pickle.load(f)
with open("label_encoder.pkl", "rb") as f:
    label_encoder = pickle.load(f)
# with open("gru_atten_model.h5", "rb") as f:
#     best_model = load_model(f, custom_objects={'AttentionLayer': Layer})
best_model = load_model("gru_atten_model.h5", custom_objects={'AttentionLayer': AttentionLayer})


# --- Reuse genre features list and cleaner ---
genre_meta_features = ['duration','tempo','key','loudness'] + [
    f'pitch_mean_{i}' for i in range(12)
] + [f'pitch_std_{i}' for i in range(12)] + [f'timbre_mean_{i}' for i in range(12)] + [f'timbre_std_{i}' for i in range(12)]

stopwords_set = set(stopwords.words('english'))
def clean_text(text):
    text = re.sub(r"[^a-zA-Z\s]", "", str(text).lower())
    return ' '.join(word for word in text.split() if word not in stopwords_set)

exclude_cols = {'track_id', 'artist_name', 'artist_id', 'top_5_similar_songs', 'genre', 'lyrics', 'song_id', 'release', 'title', 'artist_tags'}
categorical_cols = ['predicted_genre']

# Load full dataset to get consistent feature columns
df_full = pd.read_csv("msd_dataset_enriched_with_similar_songs.csv")
df_full = df_full[~df_full['lyrics'].isna() & df_full['lyrics'].str.strip().ne("")].copy()
df_full['genre'] = df_full['genre'].fillna("Unknown")

# Predict genres for missing entries in full set
X_meta_all = meta_scaler.transform(df_full[genre_meta_features].fillna(0))
X_lyrics_all = pad_sequences(tokenizer.texts_to_sequences(df_full['lyrics'].apply(clean_text)), maxlen=100)
unlabeled_mask = df_full['genre'] == "Unknown"
y_pred = np.argmax(best_model.predict([X_lyrics_all[unlabeled_mask], X_meta_all[unlabeled_mask]]), axis=1)
df_full.loc[unlabeled_mask, 'genre'] = label_encoder.inverse_transform(y_pred)
df_full['predicted_genre'] = df_full['genre']

all_feature_cols = [col for col in df_full.columns if col not in exclude_cols and col not in categorical_cols]

# --- Preprocess Test Data ---
def preprocess_test_df(df_test):
    print("Preprocessing test set...")
    df_test = df_test[~df_test['lyrics'].isna() & df_test['lyrics'].str.strip().ne("")].copy()

    for col in ['lyrics', 'title', 'artist_tags']:
        df_test[f'cleaned_{col}'] = df_test[col].apply(clean_text)

    X_lyrics = pad_sequences(tokenizer.texts_to_sequences(df_test['cleaned_lyrics']), maxlen=100)
    X_title = pad_sequences(tokenizer.texts_to_sequences(df_test['cleaned_title']), maxlen=20)
    X_tags = pad_sequences(tokenizer.texts_to_sequences(df_test['cleaned_artist_tags']), maxlen=30)

    feature_data = df_test[all_feature_cols].select_dtypes(include=[np.number]).fillna(0)
    feature_scaled = scaler.transform(feature_data)

    # Predict genre if missing
    df_test['predicted_genre'] = df_test['genre']
    unlabeled_mask = df_test['genre'].isna()
    if unlabeled_mask.sum() > 0:
        X_meta_test_for_genre = meta_scaler.transform(df_test[genre_meta_features].fillna(0))
        X_lyrics_for_genre = pad_sequences(tokenizer.texts_to_sequences(df_test['cleaned_lyrics']), maxlen=100)
        y_pred_test = np.argmax(best_model.predict([X_lyrics_for_genre[unlabeled_mask], X_meta_test_for_genre[unlabeled_mask]]), axis=1)
        df_test.loc[unlabeled_mask, 'predicted_genre'] = label_encoder.inverse_transform(y_pred_test)

    genre_ohe = ohe.transform(df_test[['predicted_genre']].astype(str))
    X_meta = np.hstack([feature_scaled, genre_ohe])

    return df_test, [X_lyrics, X_meta, X_title, X_tags]

# --- Predict All Recommendations with Incremental Saving ---
def evaluate_recommendations(df_test, test_embeddings, output_file="test_recommendations.csv", top_k=5):
    print("Evaluating and generating recommendations...")

    already_written = set()
    if os.path.exists(output_file):
        existing_df = pd.read_csv(output_file)
        already_written = set(existing_df['test_track_id'])
        mode = 'a'
        header = False
    else:
        mode = 'w'
        header = True

    with open(output_file, mode, newline='') as f:
        for idx in tqdm(range(len(df_test))):
            track_id = df_test.iloc[idx]['track_id']
            if track_id in already_written:
                continue

            target_vec = test_embeddings[idx]
            scores = []
            for i in range(len(song_embeddings)):
                comp_vec = song_embeddings[i]
                pair_vec = np.concatenate([target_vec, comp_vec, np.abs(target_vec - comp_vec)])
                score = sim_model.predict(np.expand_dims(pair_vec, axis=0), verbose=0)[0][0]
                scores.append((i, score))

            top_matches = sorted(scores, key=lambda x: x[1], reverse=True)[:top_k]
            top_songs = df_full.iloc[[i for i, _ in top_matches]][['track_id', 'title', 'artist_name', 'genre']]
            match_ids = top_songs['track_id'].astype(str).tolist()
            match_titles = top_songs['title'].astype(str).tolist()
            match_artists = top_songs['artist_name'].astype(str).tolist()
            match_genres = top_songs['genre'].astype(str).tolist()

            true_matches = df_test.iloc[idx]['top_5_similar_songs'].split(';') if isinstance(df_test.iloc[idx]['top_5_similar_songs'], str) else []
            hit = any(pred in true_matches for pred in match_ids)

            line = pd.DataFrame([{
                'test_track_id': track_id,
                'test_title': df_test.iloc[idx]['title'],
                'test_artist': df_test.iloc[idx]['artist_name'],
                'test_genre': df_test.iloc[idx]['predicted_genre'],
                'recommended_track_ids': ';'.join(match_ids),
                'recommended_titles': ';'.join(match_titles),
                'recommended_artists': ';'.join(match_artists),
                'recommended_genres': ';'.join(match_genres),
                'hit': hit
            }])
            line.to_csv(f, index=False, header=header)
            f.flush()
            os.fsync(f.fileno())
            header = False

    print(f"Evaluation complete. Results saved to {output_file}")

# --- Run Inference and Save Output ---
df_test = pd.read_csv("test_set.csv")
df_test, test_inputs = preprocess_test_df(df_test)
test_embeddings = embedding_model.predict(test_inputs, batch_size=64, verbose=1)
evaluate_recommendations(df_test, test_embeddings)


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/Claudia/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 15ms/step
Preprocessing test set...
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 75ms/step
Evaluating and generating recommendations...


100%|██████████| 622/622 [8:46:46<00:00, 50.81s/it]  

Evaluation complete. Results saved to test_recommendations.csv





In [58]:
import pandas as pd
from tabulate import tabulate

# Load CSVs
df_rec = pd.read_csv("test_recommendations.csv")
df_test = pd.read_csv("test_set.csv")
df_test = df_test.set_index("track_id")  # Easier access

with open("formatted_recommendations.txt", "w", encoding="utf-8") as f:
    for idx, row in df_rec.iterrows():
        test_id = row['test_track_id']
        f.write(f"\n🎵 Test Recommendation for: {row['test_title']} by {row['test_artist']} (Genre: {row['test_genre']})\n")
        
        # Ground truth top 5 similar songs
        if test_id in df_test.index and isinstance(df_test.loc[test_id, "top_5_similar_songs"], str):
            true_similars = set(df_test.loc[test_id, "top_5_similar_songs"].replace(" ", "").split(";"))
        else:
            true_similars = set()

        # Extract predicted recommendation data
        rec_ids = row['recommended_track_ids'].split(';')
        rec_titles = row['recommended_titles'].split(';')
        rec_artists = row['recommended_artists'].split(';')
        rec_genres = row['recommended_genres'].split(';')

        # Combine and sort by overlap count
        combined = list(zip(rec_ids, rec_titles, rec_artists, rec_genres))
        combined.sort(key=lambda x: x[0] in true_similars, reverse=True)

        # Build table
        f.write(tabulate(combined, headers=["track_id", "title", "artist_name", "genre"], tablefmt="github"))
        f.write("\n" + "="*80 + "\n")
