In [1]:
%pip install torch-geometric

Collecting torch-geometric
  Downloading torch_geometric-2.6.1-py3-none-any.whl.metadata (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.1/63.1 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
Downloading torch_geometric-2.6.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m43.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch-geometric
Successfully installed torch-geometric-2.6.1
Note: you may need to restart the kernel to use updated packages.


In [2]:
import torch
import torch_geometric
from torch_geometric.data import Data
import pandas as pd
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch.nn import Linear
import csv
import random

In [3]:
# Open the CSV file and process line by line
with open('/kaggle/input/spotify-playlist/spotify_dataset.csv', mode='r', encoding='utf-8') as file:
    reader = csv.reader(file)
    valid_rows = []

    # Read all rows and filter only those with exactly 4 columns
    all_rows = list(reader)
    valid_rows = [row for row in all_rows if len(row) == 4]  # Ensure only rows with 4 columns are kept

# Sample a thousandth of the dataset randomly
dataset_size = len(valid_rows) // 1000
sampled_rows = random.sample(valid_rows, dataset_size)

# Convert the sampled rows back to a DataFrame
df = pd.DataFrame(sampled_rows, columns=["user_id", "artistname", "trackname", "playlistname"])

In [4]:
# Create mappings for users, tracks, and playlists
user_mapping = {user: idx for idx, user in enumerate(df['user_id'].unique())}
track_mapping = {track: idx + len(user_mapping) for idx, track in enumerate(df['trackname'].unique())}
artist_mapping = {artist: idx + len(user_mapping) + len(track_mapping) for idx, artist in enumerate(df['artistname'].unique())}
playlist_mapping = {playlist: idx + len(user_mapping) + len(track_mapping) + len(artist_mapping) for idx, playlist in enumerate(df['playlistname'].unique())}

In [5]:
# Creating the nodes for the graph
user_nodes = df['user_id'].map(user_mapping).values
track_nodes = df['trackname'].map(track_mapping).values
artist_nodes = df['artistname'].map(artist_mapping).values
playlist_nodes = df['playlistname'].map(playlist_mapping).values

In [6]:
# Create edge_index (i.e., relationships between users, tracks, artists, and playlists)
# Edges are bidirectional between user and track, track and artist, track and playlist
user_to_track_edges = torch.tensor([user_nodes, track_nodes], dtype=torch.long)
track_to_artist_edges = torch.tensor([track_nodes, artist_nodes], dtype=torch.long)
track_to_playlist_edges = torch.tensor([track_nodes, playlist_nodes], dtype=torch.long)

  user_to_track_edges = torch.tensor([user_nodes, track_nodes], dtype=torch.long)


In [7]:
# Combine all edges
edge_index = torch.cat([user_to_track_edges, track_to_artist_edges, track_to_playlist_edges], dim=1)

In [8]:
# Create node features (random features here; you can use specific features like genre, track length, etc.)
num_nodes = len(user_mapping) + len(track_mapping) + len(artist_mapping) + len(playlist_mapping)
num_features = 32  # Can be adjusted
node_features = torch.randn(num_nodes, num_features)

In [9]:
# Build the target ratings or interactions for training (could be interaction count or binary like/dislike)
# Here, we are assuming interaction as edge weights, which could be 1 for presence of edge
edge_weights = torch.ones(edge_index.shape[1], dtype=torch.float)

# Create Data object
data = Data(x=node_features, edge_index=edge_index, edge_attr=edge_weights)

In [10]:
# Define the GNN model using Graph Convolutional Network (GCN)
class GNNModel(torch.nn.Module):
    def __init__(self, num_features, hidden_channels):
        super(GNNModel, self).__init__()
        self.conv1 = GCNConv(num_features, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.linear = Linear(hidden_channels, 1)
    
    def forward(self, x, edge_index, edge_attr):
        # Apply first graph convolution layer
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=0.5, training=self.training)

        # Apply second graph convolution layer
        x = self.conv2(x, edge_index)
        x = F.relu(x)

        # Predict ratings or recommendation score
        x = self.linear(x)
        return x

In [11]:
# Training function
def train_model(model, data, epochs=50):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    model.train()
    
    for epoch in range(epochs):
        optimizer.zero_grad()
        out = model(data.x, data.edge_index, data.edge_attr)
        
        # For simplicity, we use Mean Squared Error loss
        loss = F.mse_loss(out[data.edge_index[0]], data.edge_attr)
        loss.backward()
        optimizer.step()
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch+1:03d}, Loss: {loss:.4f}')

In [12]:
# Making recommendations for a user
def get_recommendations(model, data, user_idx, top_k=5):
    model.eval()
    with torch.no_grad():
        embeddings = model.conv2(
            model.conv1(data.x, data.edge_index, data.edge_attr),
            data.edge_index
        )
        
        # Get the user embedding
        user_embedding = embeddings[user_idx]
        
        # Get track embeddings
        track_indices = torch.tensor(list(track_mapping.values()))
        track_embeddings = embeddings[track_indices]
        
        # Calculate similarity between user embedding and track embeddings
        similarity = F.cosine_similarity(user_embedding.unsqueeze(0), track_embeddings)
        
        # Get top-k most similar tracks
        top_k_indices = similarity.argsort(descending=True)[:top_k]
        
        recommended_tracks = [list(track_mapping.keys())[idx] for idx in top_k_indices]
        return recommended_tracks

In [13]:
# Initialize model
model = GNNModel(num_features=num_features, hidden_channels=64)
# Train the model
train_model(model, data)

  loss = F.mse_loss(out[data.edge_index[0]], data.edge_attr)


Epoch 010, Loss: 0.2122
Epoch 020, Loss: 0.1037
Epoch 030, Loss: 0.0567
Epoch 040, Loss: 0.0397
Epoch 050, Loss: 0.0299


In [14]:
torch.save(model, "song_recommender.pth")

In [15]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12890 entries, 0 to 12889
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   user_id       12890 non-null  object
 1   artistname    12890 non-null  object
 2   trackname     12890 non-null  object
 3   playlistname  12890 non-null  object
dtypes: object(4)
memory usage: 402.9+ KB


In [16]:
df.head()

Unnamed: 0,user_id,artistname,trackname,playlistname
0,c8a09fa0a90b94ac688923535315c4d9,Chicane feat Salt Tank,Leaving Town,Chill Out on Soundrop
1,c190f4de86d8621b5cc345f9c6d97f31,Vertical Church Band,I Will Follow,Worship
2,4398de6902abde3351347b048fcdc287,Middle Class Fashion,Perfect Person,Library & Streams (Part 19)
3,f2953b6092b4247c65ccba23dc993f43,Band of Horses,Long Vows,Starred
4,b358a18274eb1e8ecc8e731ff348c268,Tosca,Heidi Brühl,Hotel Costes


In [17]:
user_id = '11da254d9d1948488318e3ea286bf484'  # Example user_id
user_idx = user_mapping.get(user_id)
if user_idx is not None:
    recommended_tracks = get_recommendations(model, data, user_idx)
    print(f"Recommended tracks for user {user_id}: {recommended_tracks}")
else:
    print(f"User {user_id} not found.")

Recommended tracks for user 11da254d9d1948488318e3ea286bf484: ["Livin' Is Easy", 'Away We Go', 'Undertro', 'Woman', 'To Walk Alone']


In [18]:
loaded_model = torch.load("/kaggle/working/song_recommender.pth")
loaded_model.eval()

  loaded_model = torch.load("/kaggle/working/song_recommender.pth")


GNNModel(
  (conv1): GCNConv(32, 64)
  (conv2): GCNConv(64, 64)
  (linear): Linear(in_features=64, out_features=1, bias=True)
)

In [19]:
user_id = '11da254d9d1948488318e3ea286bf484'  # Example user_id
user_idx = user_mapping.get(user_id)
if user_idx is not None:
    recommended_tracks = get_recommendations(loaded_model, data, user_idx)
    print(f"Recommended tracks for user {user_id}: {recommended_tracks}")
else:
    print(f"User {user_id} not found.")

Recommended tracks for user 11da254d9d1948488318e3ea286bf484: ["Livin' Is Easy", 'Away We Go', 'Undertro', 'Woman', 'To Walk Alone']


In [20]:
%pip install torchviz

Collecting torchviz
  Downloading torchviz-0.0.3-py3-none-any.whl.metadata (2.1 kB)
Downloading torchviz-0.0.3-py3-none-any.whl (5.7 kB)
Installing collected packages: torchviz
Successfully installed torchviz-0.0.3
Note: you may need to restart the kernel to use updated packages.


In [21]:
from torchviz import make_dot

# Example forward pass to generate the computation graph
sample_output = model(data.x, data.edge_index, data.edge_attr)
dot_graph = make_dot(sample_output, params=dict(model.named_parameters()))

# Save the graph to a file
dot_graph.render("gnn_computation_graph", format="png")

'gnn_computation_graph.png'

In [22]:
import torch
import torch.nn.functional as F
from torch.nn import Linear
from torch_geometric.nn import GCNConv, GATConv, SAGEConv

# GCN Model
class GCNModel(torch.nn.Module):
    def __init__(self, num_features, hidden_channels):
        super(GCNModel, self).__init__()
        self.conv1 = GCNConv(num_features, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.linear = Linear(hidden_channels, 1)
    
    def forward(self, x, edge_index, edge_attr=None):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.linear(x)
        return x


# GAT Model
class GATModel(torch.nn.Module):
    def __init__(self, num_features, hidden_channels, heads=1):
        super(GATModel, self).__init__()
        self.conv1 = GATConv(num_features, hidden_channels, heads=heads)
        self.conv2 = GATConv(hidden_channels * heads, hidden_channels, heads=heads)
        self.linear = Linear(hidden_channels * heads, 1)
    
    def forward(self, x, edge_index, edge_attr=None):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.linear(x)
        return x


# GraphSAGE Model
class GraphSAGEModel(torch.nn.Module):
    def __init__(self, num_features, hidden_channels):
        super(GraphSAGEModel, self).__init__()
        self.conv1 = SAGEConv(num_features, hidden_channels)
        self.conv2 = SAGEConv(hidden_channels, hidden_channels)
        self.linear = Linear(hidden_channels, 1)
    
    def forward(self, x, edge_index, edge_attr=None):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.linear(x)
        return x


In [23]:
def train_model(model, data, epochs=50):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    model.train()
    
    for epoch in range(epochs):
        optimizer.zero_grad()
        out = model(data.x, data.edge_index)
        loss = F.mse_loss(out[data.edge_index[0]], data.edge_attr)
        loss.backward()
        optimizer.step()
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch+1:03d}, Loss: {loss:.4f}')


In [24]:
def get_recommendations(model, data, user_idx, top_k=5):
    model.eval()
    with torch.no_grad():
        embeddings = model.conv2(model.conv1(data.x, data.edge_index))
        user_embedding = embeddings[user_idx]

        track_indices = torch.tensor(list(track_mapping.values()))
        track_embeddings = embeddings[track_indices]

        similarity = F.cosine_similarity(user_embedding.unsqueeze(0), track_embeddings)
        top_k_indices = similarity.argsort(descending=True)[:top_k]

        recommended_tracks = [list(track_mapping.keys())[idx] for idx in top_k_indices]
        return recommended_tracks


In [25]:
num_features = data.num_features
hidden_channels = 16

models = {
    "GCN": GCNModel(num_features, hidden_channels),
    "GAT": GATModel(num_features, hidden_channels),
    "GraphSAGE": GraphSAGEModel(num_features, hidden_channels),
}

for name, model in models.items():
    print(f"\nTraining {name} model...")
    train_model(model, data)



Training GCN model...


  loss = F.mse_loss(out[data.edge_index[0]], data.edge_attr)


Epoch 010, Loss: 0.1976
Epoch 020, Loss: 0.1058
Epoch 030, Loss: 0.0713
Epoch 040, Loss: 0.0492
Epoch 050, Loss: 0.0328

Training GAT model...
Epoch 010, Loss: 0.1404
Epoch 020, Loss: 0.0788
Epoch 030, Loss: 0.0480
Epoch 040, Loss: 0.0292
Epoch 050, Loss: 0.0181

Training GraphSAGE model...
Epoch 010, Loss: 0.0661
Epoch 020, Loss: 0.0336
Epoch 030, Loss: 0.0148
Epoch 040, Loss: 0.0064
Epoch 050, Loss: 0.0028
