In [2]:
!pip install pandas numpy torch

Collecting pandas
  Using cached pandas-2.2.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (89 kB)
Collecting numpy
  Downloading numpy-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (62 kB)
Collecting torch
  Downloading torch-2.6.0-cp310-cp310-manylinux1_x86_64.whl.metadata (28 kB)
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2025.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Downloading tzdata-2025.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting filelock (from torch)
  Downloading filelock-3.17.0-py3-none-any.whl.metadata (2.9 kB)
Collecting networkx (from torch)
  Using cached networkx-3.4.2-py3-none-any.whl.metadata (6.3 kB)
Collecting jinja2 (from torch)
  Using cached jinja2-3.1.5-py3-none-any.whl.metadata (2.6 kB)
Collecting fsspec (from torch)
  Downloading fsspec-2025.2.0-py3-none-any.whl.metadata (11 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Using cached

In [1]:
import pandas as pd
import torch
import numpy as np
from torch_geometric.data import Data

# Load your CSV file.
# Change 'path/to/your_file.csv' to the actual path.
df = pd.read_csv('spotify_songs.csv')

# Inspect the header (should match your provided header)
print(df.columns)
# Output: Index(['track_id', 'track_name', 'track_artist', 'track_popularity',
#                'track_album_id', 'track_album_name', 'track_album_release_date',
#                'playlist_name', 'playlist_id', 'playlist_genre',
#                'playlist_subgenre', 'danceability', 'energy', 'key', 'loudness',
#                'mode', 'speechiness', 'acousticness', 'instrumentalness',
#                'liveness', 'valence', 'tempo', 'duration_ms'], dtype='object')

# --- Create Node Sets ---
# We use playlists as one set and tracks as the other.
playlist_ids = df['playlist_id'].unique()
track_ids = df['track_id'].unique()

num_playlists = len(playlist_ids)
num_tracks = len(track_ids)
total_nodes = num_playlists + num_tracks

# Create mapping dictionaries:
# For playlists, assign indices 0 .. num_playlists-1.
playlist_id_to_index = {pid: i for i, pid in enumerate(playlist_ids)}
# For tracks, assign indices num_playlists .. total_nodes-1.
track_id_to_index = {tid: i + num_playlists for i, tid in enumerate(track_ids)}

# --- Build Edges ---
# Each row represents an edge from a playlist to a track.
source_nodes = []  # playlist node indices
target_nodes = []  # track node indices
for _, row in df.iterrows():
    pid = row['playlist_id']
    tid = row['track_id']
    # Only add an edge if the mapping exists.
    if pid in playlist_id_to_index and tid in track_id_to_index:
        source_nodes.append(playlist_id_to_index[pid])
        target_nodes.append(track_id_to_index[tid])

# Create undirected edges by adding both (playlist -> track) and (track -> playlist)
edge_index = torch.tensor(
    [source_nodes + target_nodes, target_nodes + source_nodes], dtype=torch.long
)

# --- Build Node Features ---
# We will assign features only to the track nodes using audio features.
# For playlists (nodes 0 to num_playlists-1), we initialize with zeros.
# For tracks, we use a vector composed of:
#   [danceability, energy, key, loudness, mode, speechiness, acousticness,
#    instrumentalness, liveness, valence, tempo, duration_ms]
track_feature_cols = ['danceability', 'energy', 'key', 'loudness', 'mode',
                      'speechiness', 'acousticness', 'instrumentalness', 'liveness',
                      'valence', 'tempo', 'duration_ms']
num_features = len(track_feature_cols)

# Initialize a feature matrix for all nodes.
# For playlists, we use zeros; for tracks, we’ll fill in the audio features.
x = torch.zeros((total_nodes, num_features), dtype=torch.float)

# Get a DataFrame of unique track rows (using the first occurrence for each track).
track_df = df.drop_duplicates('track_id').set_index('track_id')

# Fill in track node features.
for tid, node_idx in track_id_to_index.items():
    if tid in track_df.index:
        row = track_df.loc[tid]
        # Convert the selected features into a list of floats.
        feat_values = [row[col] for col in track_feature_cols]
        x[node_idx] = torch.tensor(feat_values, dtype=torch.float)

# Now we have:
# - x: Node feature matrix (playlists have zero vectors; tracks have audio features)
# - edge_index: Graph connectivity (playlist-track interactions)
data = Data(x=x, edge_index=edge_index)
print(f"Total nodes: {total_nodes} (Playlists: {num_playlists}, Tracks: {num_tracks})")

Index(['track_id', 'track_name', 'track_artist', 'track_popularity',
       'track_album_id', 'track_album_name', 'track_album_release_date',
       'playlist_name', 'playlist_id', 'playlist_genre', 'playlist_subgenre',
       'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness',
       'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
       'duration_ms'],
      dtype='object')
Total nodes: 28827 (Playlists: 471, Tracks: 28356)


In [2]:
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv

class GNNRec(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(GNNRec, self).__init__()
        # First GCN layer.
        self.conv1 = GCNConv(in_channels, hidden_channels)
        # Second GCN layer.
        self.conv2 = GCNConv(hidden_channels, out_channels)
        
    def forward(self, x, edge_index):
        # First convolution + activation.
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        # Optional dropout.
        x = F.dropout(x, training=self.training)
        # Second convolution to produce final embeddings.
        x = self.conv2(x, edge_index)
        return x

# Instantiate the model.
# in_channels equals the feature dimension (num_features).
# hidden_channels and out_channels can be tuned.
model = GNNRec(in_channels=num_features, hidden_channels=128, out_channels=64)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)


In [3]:
# For training, we will need the positive edge indices corresponding to playlist->track edges.
# Recall that in our undirected graph, the first half of the edge_index corresponds to these.
pos_playlist_nodes = edge_index[0][:len(source_nodes)]
pos_track_nodes = edge_index[1][:len(source_nodes)]

num_pos_edges = pos_playlist_nodes.shape[0]
num_epochs = 100

model.train()
for epoch in range(num_epochs):
    optimizer.zero_grad()
    
    # Compute node embeddings.
    embeddings = model(data.x, data.edge_index)
    
    # --- Positive Samples ---
    pos_playlist_emb = embeddings[pos_playlist_nodes]
    pos_track_emb = embeddings[pos_track_nodes]
    pos_scores = (pos_playlist_emb * pos_track_emb).sum(dim=1)
    pos_loss = -torch.log(torch.sigmoid(pos_scores) + 1e-15).mean()
    
    # --- Negative Sampling ---
    # Randomly sample playlist and track indices.
    neg_playlist_nodes = torch.randint(0, num_playlists, (num_pos_edges,))
    neg_track_nodes = torch.randint(num_playlists, total_nodes, (num_pos_edges,))
    neg_playlist_emb = embeddings[neg_playlist_nodes]
    neg_track_emb = embeddings[neg_track_nodes]
    neg_scores = (neg_playlist_emb * neg_track_emb).sum(dim=1)
    neg_loss = -torch.log(1 - torch.sigmoid(neg_scores) + 1e-15).mean()
    
    # Total loss.
    loss = pos_loss + neg_loss
    loss.backward()
    optimizer.step()
    
    if (epoch+1) % 10 == 0:
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {loss.item():.4f}")


Epoch 10/100 - Loss: 34.5388
Epoch 20/100 - Loss: 34.5388
Epoch 30/100 - Loss: 34.5388
Epoch 40/100 - Loss: 34.5388
Epoch 50/100 - Loss: 34.5388
Epoch 60/100 - Loss: 34.5388
Epoch 70/100 - Loss: 34.5388
Epoch 80/100 - Loss: 34.5388
Epoch 90/100 - Loss: 34.5388
Epoch 100/100 - Loss: 34.5388


In [5]:
model.eval()
with torch.no_grad():
    final_embeddings = model(data.x, data.edge_index)

# Separate playlist and track embeddings.
playlist_embeddings = final_embeddings[:num_playlists]  # indices 0 to num_playlists-1
track_embeddings = final_embeddings[
    num_playlists:
]  # indices num_playlists to total_nodes-1

# For a given playlist (e.g., playlist with index 0), compute scores with all tracks.
playlist_index = 0  # change this to the desired playlist index
playlist_emb = playlist_embeddings[playlist_index]
scores = torch.matmul(track_embeddings, playlist_emb)

# Get the top-10 recommended track indices (relative to the track node set)
topk = 10
_, top_track_indices = torch.topk(scores, topk)
print(
    f"Top {topk} recommended track node indices for playlist {playlist_index}: {top_track_indices.tolist()}"
)

# Build an inverse mapping from track node index to track_id.
# Recall: track nodes are numbered from num_playlists to total_nodes-1.
inv_track_map = {v: k for k, v in track_id_to_index.items()}

# Retrieve the track IDs corresponding to the recommended track node indices,
# and then use the track metadata DataFrame (`track_df`) to get the track names.
recommended_track_names = []
for idx in top_track_indices.tolist():
    # Convert the local track index (0...num_tracks-1) to the actual node index.
    node_index = idx + num_playlists
    track_id = inv_track_map.get(node_index)
    if track_id is not None and track_id in track_df.index:
        # Assuming 'track_name' is the column with the song name.
        track_name = track_df.loc[track_id]["track_name"]
        recommended_track_names.append(track_name)
    else:
        recommended_track_names.append("Unknown Track")

print("Recommended Track Names:", recommended_track_names)

Top 10 recommended track node indices for playlist 0: [3403, 133, 344, 28158, 28185, 28204, 28138, 28184, 28202, 28159]
Recommended Track Names: ["Sweet Child O' Mine", 'Closer (feat. Halsey)', 'bad guy', 'Hypnotic Energy', 'Blabla', 'El Segundo', 'Toro - Original Mix', 'Mental Vortex', 'Samurai', 'Billy Boy']


In [8]:
import torch
import torch.nn.functional as F

# Ensure your model is in evaluation mode.
model.eval()
with torch.no_grad():
    # Compute the final node embeddings for all nodes.
    final_embeddings = model(data.x, data.edge_index)

# Separate playlist and track embeddings.
# (Playlist nodes: indices 0 to num_playlists-1;
#  Track nodes: indices num_playlists to total_nodes-1)
playlist_embeddings = final_embeddings[:num_playlists]
track_embeddings = final_embeddings[num_playlists:]

# Build an inverse mapping from track node index (global index) to track_id.
inv_track_map = {v: k for k, v in track_id_to_index.items()}

def get_track_node_index_by_name(query_name):
    """
    Given a song name, try to locate its track_id in track_df and return
    the corresponding node index using the track_id_to_index mapping.
    The search is case-insensitive and requires an exact match.
    """
    # Use a case-insensitive match on the 'track_name' column.
    matches = track_df[track_df['track_name'].str.lower() == query_name.lower()]
    if matches.empty:
        return None, None
    else:
        # Use the first match.
        track_id = matches.index[0]
        node_index = track_id_to_index.get(track_id)
        return track_id, node_index

# --- Interactive Query Section ---
while True:
    query_name = input("Enter a song name (or 'quit' to exit): ").strip()
    if query_name.lower() == 'quit':
        break

    track_id, node_index = get_track_node_index_by_name(query_name)
    if node_index is None:
        print("Song not found in the dataset. Please try again.")
        continue

    # Retrieve the embedding of the input song.
    # (node_index is the global node index.)
    song_embedding = final_embeddings[node_index]

    # Compute similarity (dot product) between the input song embedding
    # and all track embeddings.
    # track_embeddings is of size [num_tracks, embedding_dim]
    similarities = torch.matmul(track_embeddings, song_embedding)

    # Since the input song is among the track embeddings, we remove it from the results.
    # Compute the local index of the song within the track_embeddings matrix.
    local_index = node_index - num_playlists
    similarities[local_index] = -float("inf")  # exclude itself

    # Get the top-10 similar track indices (local indices within track_embeddings)
    topk = 10
    top_sim_values, top_indices = torch.topk(similarities, topk)

    # Map local indices back to global node indices and then to track IDs,
    # then retrieve the song names from track_df.
    recommended_track_names = []
    for local_idx in top_indices.tolist():
        global_node_idx = local_idx + num_playlists
        rec_track_id = inv_track_map.get(global_node_idx, None)
        if rec_track_id is not None:
            # Get the track name from track_df.
            rec_track_name = track_df.loc[rec_track_id]['track_name']
            recommended_track_names.append(rec_track_name)
        else:
            recommended_track_names.append("Unknown Track")

    print("\nTop 10 recommended songs similar to '{}' are:".format(query_name))
    for i, name in enumerate(recommended_track_names, 1):
        print(f"{i}. {name}")
    print("\n")



Top 10 recommended songs similar to 'Memories' are:
1. Sweet Child O' Mine
2. Closer (feat. Halsey)
3. bad guy
4. Hypnotic Energy
5. Blabla
6. El Segundo
7. Toro - Original Mix
8. Mental Vortex
9. Samurai
10. Billy Boy



Top 10 recommended songs similar to 'Body On My' are:
1. Sweet Child O' Mine
2. Closer (feat. Halsey)
3. bad guy
4. Hypnotic Energy
5. Blabla
6. El Segundo
7. Toro - Original Mix
8. Mental Vortex
9. Samurai
10. Billy Boy


