In [1]:
%pip install -q implicit scipy

Note: you may need to restart the kernel to use updated packages.


In [10]:
import pickle
from pathlib import Path
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from implicit.als import AlternatingLeastSquares
from scipy.sparse import csr_matrix
import pandas as pd
from tqdm import tqdm
from collections import defaultdict, Counter
from itertools import combinations
import random
import math


In [11]:
input_path = "processed/01_filtered/"

with open(Path(input_path) / "filtered_playlists.pkl", "rb") as f:
    playlists = pickle.load(f)  # list of lists of track_uris

with open(Path(input_path) / "valid_tracks.pkl", "rb") as f:
    valid_tracks_dict = pickle.load(f)  # dict from track_uri -> metadata dict

In [12]:
def filter_valid_tracks(playlists, valid_tracks):
    filtered = []
    for pl in tqdm(playlists, total=len(playlists), desc="Filtering playlists"):
        filtered_tracks = [t for t in pl['tracks'] if t in valid_tracks]
        filtered.append({'name': pl['name'], 'tracks': filtered_tracks})
    return filtered

filtered_playlists = filter_valid_tracks(playlists, valid_tracks_dict)

Filtering playlists: 100%|██████████| 774682/774682 [00:03<00:00, 249569.11it/s]


In [14]:
print(filtered_playlists[0])

{'name': 'Baile', 'tracks': ['spotify:track:3ZFTkvIE7kyPt6Nu3PEa7V']}


In [4]:
# Train/test split (80/20)
random.seed(42)
idxs = list(range(len(filtered_playlists)))
random.shuffle(idxs)
split_idx = int(0.8 * len(idxs))

train_idxs = idxs[:split_idx]
test_idxs = idxs[split_idx:]

train_playlists = [filtered_playlists[i] for i in train_idxs]
test_playlists = [filtered_playlists[i] for i in test_idxs]

print(f"Train playlists: {len(train_playlists)}, Test playlists: {len(test_playlists)}")

Train playlists: 619745, Test playlists: 154937


In [5]:
# Build inverted index from training data only
track_to_playlists = defaultdict(set)
for pid, pl in tqdm(enumerate(train_playlists), total=len(train_playlists), desc="Building inverted index"):
    for track in pl['tracks']:
        track_to_playlists[track].add(pid)

Building inverted index: 100%|██████████| 619745/619745 [00:01<00:00, 321055.90it/s]


In [6]:
# Recommendation function
def find_similar_playlists(query_tracks, track_to_pl, max_neighbors=500):
    candidate_counts = Counter()
    for track in query_tracks:
        candidate_counts.update(track_to_pl.get(track, set()))
    most_common = candidate_counts.most_common(max_neighbors)
    similar_pids = [pid for pid, count in most_common]
    return similar_pids

def recommend_tracks(query_playlist, train_playlists, track_to_pl, top_k=100, max_neighbors=500):
    query_tracks = query_playlist['tracks']
    similar_pids = find_similar_playlists(query_tracks, track_to_pl, max_neighbors=max_neighbors)
    track_counter = Counter()
    for pid in similar_pids:
        pl = train_playlists[pid]
        track_counter.update(pl['tracks'])
    existing_tracks = set(query_tracks)
    recommendations = [t for t, _ in track_counter.most_common() if t not in existing_tracks]
    return recommendations[:top_k]

In [7]:


# Evaluation metrics
def recall_at_k(true_tracks, pred_tracks, k):
    true_set = set(true_tracks)
    pred_set = set(pred_tracks[:k])
    if not true_set:
        return 0.0
    return len(true_set & pred_set) / len(true_set)

def ndcg_at_k(true_tracks, pred_tracks, k):
    true_set = set(true_tracks)
    dcg = 0.0
    idcg = 0.0
    for i in range(k):
        if i < len(pred_tracks) and pred_tracks[i] in true_set:
            dcg += 1 / math.log2(i + 2)
    # Ideal DCG: all true tracks ranked perfectly
    for i in range(min(len(true_tracks), k)):
        idcg += 1 / math.log2(i + 2)
    return dcg / idcg if idcg > 0 else 0.0

# Prepare test data: split test playlists into input (partial) and target (held-out tracks)
def split_test_playlist(pl, test_ratio=0.3, min_heldout=1):
    tracks = pl['tracks']
    n = len(tracks)
    if n < 2:
        return None, None
    n_heldout = max(int(n * test_ratio), min_heldout)
    heldout = tracks[-n_heldout:]
    input_tracks = tracks[:-n_heldout]
    if len(input_tracks) == 0:
        # fallback: keep at least one track in input
        input_tracks = tracks[:1]
        heldout = tracks[1:]
    return input_tracks, heldout

In [8]:

recall_scores = []
ndcg_scores = []
top_k = 100

print("Starting evaluation on test set...")
for pl in tqdm(random.sample(test_playlists, k=1000), total=1000, desc="Evaluating playlists"):
    input_tracks, heldout_tracks = split_test_playlist(pl)
    if input_tracks is None or heldout_tracks is None or len(heldout_tracks) == 0:
        continue
    input_pl = {'name': pl['name'], 'tracks': input_tracks}
    preds = recommend_tracks(input_pl, train_playlists, track_to_playlists, top_k=top_k)
    recall_scores.append(recall_at_k(heldout_tracks, preds, top_k))
    ndcg_scores.append(ndcg_at_k(heldout_tracks, preds, top_k))

print(f"Recall@{top_k}: {np.mean(recall_scores):.4f}")
print(f"NDCG@{top_k}: {np.mean(ndcg_scores):.4f}")

Starting evaluation on test set...


Evaluating playlists: 100%|██████████| 1000/1000 [00:22<00:00, 45.15it/s]

Recall@100: 0.6890
NDCG@100: 0.3072



