In [1]:
import pandas as pd
import torch
from torch_geometric.data import Data
from torch_geometric.nn import SAGEConv
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import numpy as np
import random
from node2vec import Node2Vec
import networkx as nx
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.manifold import TSNE
import seaborn as sns
import matplotlib.pyplot as plt

# Loading cleaned nodes and edges data
nodes_df = pd.read_csv("nodes_cleaned.csv")  
edges_df = pd.read_csv("edges_cleaned.csv")  


  nodes_df = pd.read_csv("nodes_cleaned.csv")


In [2]:
if nodes_df['spotify_id'].duplicated().any():
    nodes_df = nodes_df.drop_duplicates(subset=['spotify_id']).reset_index(drop=True)
if nodes_df['spotify_id'].duplicated().any():
    raise ValueError("Duplicate Spotify IDs found in nodes_df.")



In [3]:

# Mapping Spotify IDs to indices for graph construction
node_index_map = {spotify_id: idx for idx, spotify_id in enumerate(nodes_df['spotify_id'])}
edges_df['Source'] = edges_df['id_0'].map(node_index_map)
edges_df['Target'] = edges_df['id_1'].map(node_index_map)


# Creating edge index (two rows: source and target nodes)
edge_index = torch.tensor(edges_df[['Source', 'Target']].to_numpy().T, dtype=torch.long)


In [4]:

# function to generate negative edges
def generate_negative_edges(num_nodes, existing_edges, num_samples):
    """
    Generate negative edges by sampling random node pairs that are not connected.
    """
    # Convert to set for fast lookup
    existing_edges_set = set(map(tuple, existing_edges.T.tolist()))  
    negative_edges = set()

    while len(negative_edges) < num_samples:
        i, j = np.random.randint(0, num_nodes, size=2)
        if i != j and (i, j) not in existing_edges_set and (j, i) not in existing_edges_set:
            negative_edges.add((i, j))

    return torch.tensor(list(negative_edges), dtype=torch.long)

# Generating negative edges
num_negative_samples = len(edges_df)  # Same as the number of positive edges
negative_edges = generate_negative_edges(len(nodes_df), edge_index, num_negative_samples)



In [5]:
# Combining positive and negative edges for training
positive_edges = edge_index.T

# Convert edge_index to a NetworkX graph
graph = nx.Graph()
graph.add_edges_from(edge_index.T.tolist())

# Initializing and fit Node2Vec
node2vec = Node2Vec(graph, dimensions=32, walk_length=10, num_walks=50, workers=8, p=1, q=2)
model = node2vec.fit(window=5, min_count=1, batch_words=4)

# Geting node embeddings, handling missing nodes
node_embeddings = []
for node in range(len(nodes_df)):
    if str(node) in model.wv:  # Check if the node exists in the Node2Vec model
        node_embeddings.append(model.wv[str(node)])  # Use the embedding from the model
    else:
        node_embeddings.append(np.zeros(model.wv.vector_size))  # Initialize missing embeddings with zeros

# Converting the embeddings list to a NumPy array
node_embeddings = np.array(node_embeddings)

# Saving the embeddings to a file
np.save('node_embeddings.npy', node_embeddings)


Computing transition probabilities:   0%|          | 0/135058 [00:00<?, ?it/s]

Generating walks (CPU: 1): 100%|██████████| 7/7 [01:52<00:00, 16.10s/it]
Generating walks (CPU: 3): 100%|██████████| 6/6 [01:57<00:00, 19.54s/it]
Generating walks (CPU: 2): 100%|██████████| 7/7 [02:11<00:00, 18.79s/it]
Generating walks (CPU: 4): 100%|██████████| 6/6 [02:01<00:00, 20.30s/it]
Generating walks (CPU: 5): 100%|██████████| 6/6 [02:03<00:00, 20.56s/it]
Generating walks (CPU: 6): 100%|██████████| 6/6 [01:48<00:00, 18.05s/it]
Generating walks (CPU: 7): 100%|██████████| 6/6 [01:37<00:00, 16.23s/it]
Generating walks (CPU: 8): 100%|██████████| 6/6 [01:17<00:00, 12.98s/it]


KeyboardInterrupt: 

In [None]:
node_keys = list(graph.nodes)
print(len(node_keys))

In [None]:
node_features = torch.tensor(node_embeddings, dtype=torch.float)  # Update node features

all_edges = torch.cat([positive_edges, negative_edges], dim=0)
labels = torch.cat([torch.ones(len(positive_edges)), torch.zeros(len(negative_edges))])

# Train-test split for edges
train_edges, test_edges, train_labels, test_labels = train_test_split(
    all_edges, labels, test_size=0.2, random_state=42
)

# Converting to PyTorch Geometric Data
data = Data(x=node_features, edge_index=edge_index)



In [None]:


# Seting a low perplexity (to make sure it's less than the number of nodes in your subset)
perplexity_value = 5

# Reducing embeddings to 2D using t-SNE
tsne = TSNE(n_components=2, random_state=42, perplexity=perplexity_value)

# Selecting a subset of nodes to visualize (example: top 20 based on popularity/followers or specific genres)
subset_size = 20
subset_indices = nodes_df.nlargest(subset_size, 'followers').index  

# Geting corresponding embeddings for the subset
subset_embeddings = node_embeddings[subset_indices]

# Applying t-SNE to the subset of embeddings
reduced_embeddings = tsne.fit_transform(subset_embeddings)

# Creating a scatter plot for the subset
plt.figure(figsize=(10, 10))
sns.scatterplot(x=reduced_embeddings[:, 0], y=reduced_embeddings[:, 1], s=100, color="blue")

# adding labels to the points (nodes)
for i, idx in enumerate(subset_indices):
    plt.text(reduced_embeddings[i, 0], reduced_embeddings[i, 1], nodes_df.loc[idx, 'name'], fontsize=12)

plt.title(f"2D Visualization of Top {subset_size} Node Embeddings")
plt.xlabel("Dimension 1")
plt.ylabel("Dimension 2")
plt.show()


In [None]:
# Defining GraphSAGE Model for Link Prediction
class GraphSAGE(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(GraphSAGE, self).__init__()
        self.conv1 = SAGEConv(input_dim, hidden_dim)
        self.conv2 = SAGEConv(hidden_dim, hidden_dim)
        self.edge_predictor = torch.nn.Linear(hidden_dim * 2, 1)  # Combining two embeddings

    def forward(self, data):
        x = F.relu(self.conv1(data.x, data.edge_index))
        x = F.relu(self.conv2(x, data.edge_index))
        return x

    def predict_edges(self, x, edges):
        # Combine node embeddings for edge prediction
        edge_embeds = torch.cat([x[edges[:, 0]], x[edges[:, 1]]], dim=1)
        return torch.sigmoid(self.edge_predictor(edge_embeds)).squeeze()

# Initializing model and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GraphSAGE(data.num_node_features, hidden_dim=32).to(device)
data = data.to(device)
train_edges, train_labels = train_edges.to(device), train_labels.to(device)
test_edges, test_labels = test_edges.to(device), test_labels.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)



In [None]:
# Training loop
def train():
    model.train()
    optimizer.zero_grad()
    embeddings = model(data)
    pred = model.predict_edges(embeddings, train_edges)
    loss = F.binary_cross_entropy(pred, train_labels.float())
    loss.backward()
    optimizer.step()
    return loss.item()


def test():
    model.eval()
    with torch.no_grad():
        embeddings = model(data)
        pred = model.predict_edges(embeddings, test_edges)
        pred_labels = (pred > 0.5).float()  # Binary predictions
        accuracy = (pred_labels == test_labels.float()).sum() / len(test_labels)

        # Return accuracy, true labels, and predicted labels
        return accuracy.item(), test_labels.cpu().numpy(), pred_labels.cpu().numpy()


In [None]:


for epoch in range(1, 201):
    loss = train()
    if epoch % 10 == 0:
        acc, y_true, y_pred = test()
        print(f'Epoch {epoch:03d}, Loss: {loss:.4f}, Test Accuracy: {acc:.4f}')

        # Calculate confusion matrix
        cm = confusion_matrix(y_true, y_pred)
        tn, fp, fn, tp = cm.ravel()  # Extract TN, FP, FN, TP

        print("\nConfusion Matrix:")
        print(f"True Negatives (TN): {tn}")
        print(f"False Positives (FP): {fp}")
        print(f"False Negatives (FN): {fn}")
        print(f"True Positives (TP): {tp}")

        print("\nInterpretation:")
        print(f"TN: Predicted no collaboration correctly (actual no collaboration)")
        print(f"FP: Predicted collaboration incorrectly (actual no collaboration)")
        print(f"FN: Predicted no collaboration incorrectly (actual collaboration)")
        print(f"TP: Predicted collaboration correctly (actual collaboration)")

        # Optionally, print a classification report
        report = classification_report(y_true, y_pred, target_names=["No Collaboration", "Collaboration"])
        print(f'\nClassification Report:\n{report}')


In [None]:


# Converting existing edges to a set for fast lookup
existing_edges_set = set(map(tuple, edge_index.T.tolist()))  # Set of existing edges for quick lookup

# Number of unseen pairs to sample
num_samples = 300
unseen_pairs = []

# Randomly sampling node pairs and check if they're not connected
while len(unseen_pairs) < num_samples:
    i, j = random.sample(range(len(nodes_df)), 2)  # Randomly sample two nodes
    
    # Ensuring the pair is not already an existing edge
    if i != j and (i, j) not in existing_edges_set and (j, i) not in existing_edges_set:
        unseen_pairs.append((i, j))

# Converting to tensor
unseen_pairs = torch.tensor(unseen_pairs, dtype=torch.long).to(device)


In [None]:
model.eval()  # Seting model to evaluation mode
with torch.no_grad():
    embeddings = model(data)  # Geting node embeddings
    predictions = model.predict_edges(embeddings, unseen_pairs)  # Predicting for unseen pairs


In [None]:
# Ranking unseen pairs by predicted score
sorted_indices = torch.argsort(predictions, descending=True)
top_predictions = unseen_pairs[sorted_indices]
top_scores = predictions[sorted_indices]

# Converting back to artist IDs for better interpretability
top_collaborations = [
    (nodes_df.iloc[i]['spotify_id'], nodes_df.iloc[j]['spotify_id'], score.item())
    for (i, j), score in zip(top_predictions.tolist(), top_scores)
]

# Displaying top 10 predicted collaborations with artist names

for pair in top_collaborations[:10]:
    spotify_id_1, spotify_id_2 = pair[0], pair[1]
    
    # Searching for the Spotify ID in the nodes_df and print the respective artist names
    artist_1_name = nodes_df[nodes_df['spotify_id'] == spotify_id_1]['name'].values[0]
    artist_2_name = nodes_df[nodes_df['spotify_id'] == spotify_id_2]['name'].values[0]
    
    print(f"Artist 1: {artist_1_name}, Artist 2: {artist_2_name}, Predicted Score: {pair[2]:.4f}")

In [None]:

# Extract the top 10 predictions for visualization
top_10_collaborations = top_collaborations[:10]

# Prepare data for plotting
artist_pairs = [f"{nodes_df[nodes_df['spotify_id'] == pair[0]]['name'].values[0]} & {nodes_df[nodes_df['spotify_id'] == pair[1]]['name'].values[0]}" for pair in top_10_collaborations]
scores = [pair[2] for pair in top_10_collaborations]

# Plotting the top 10 collaborations
plt.figure(figsize=(10, 6))
plt.barh(artist_pairs, scores, color='skyblue')
plt.xlabel('Predicted Collaboration Score')
plt.title('Top 10 Predicted Artist Collaborations')
plt.gca().invert_yaxis()  # To show the highest scores on top
plt.show()
