In [9]:
import igraph as ig


track_network_path = "../network-parser/output/track_graph.graphml"
artist_network_path = "../network-parser/output/artist_graph.graphml"

# read graphml files
track_network = ig.Graph.Read_GraphML(track_network_path)
# artist_network = ig.Graph.Read_GraphML(artist_network_path)

# force convert to undirected
track_network.to_undirected()
# artist_network.to_undirected()

# force convert to simple
# track_network.simplify()
# artist_network.simplify()

# # confirm
# print(track_network.is_simple())
# print(artist_network.is_simple())

graph = track_network



# Plain Random Walk

In [17]:
import igraph as ig
import random
import json
from tqdm.notebook import tqdm
import logging
logger = logging.getLogger(__name__)

# setup logging to print all info
logging.basicConfig(level=logging.INFO)

# Step 1: Load the graph from GraphML
# graph = ig.Graph.Read_GraphML(track_network_path)
# graph.to_undirected()  # Ensure the graph is undirected

random.seed(42)

# Step 2: Random walk function avoiding tracks in user's playlist
def weighted_random_walk_exclude_user_pid(graph, start_node, user_pid, num_steps):
    walk = [start_node]
    current_node = start_node

    for _ in range(num_steps):
        neighbors = graph.neighbors(current_node, mode="all")

        # Exclude neighbors associated with the user's playlist ID
        valid_neighbors = [
            neighbor for neighbor in neighbors
            #if user_pid not in map(int, graph.vs[neighbor]["playlists"].split(","))
        ]
        if not valid_neighbors:
            break  # Stop if no valid neighbors

        edge_ids = [graph.get_eid(current_node, neighbor) for neighbor in valid_neighbors]
        weights = [graph.es[edge_id]["weight"] for edge_id in edge_ids]

        next_node = random.choices(valid_neighbors, weights=weights, k=1)[0]
        walk.append(next_node)
        current_node = next_node

    return walk

# Step 3: Recommendation function
def recommend_tracks_by_pid(graph, user_pid, user_track, num_steps=5, top_n=1):
    try:
        start_node = graph.vs.find(id=user_track).index
    except ValueError:
        # raise ValueError(f"Track '{user_track}' not found in the graph.")
        # print('nope')
        return None

    walk = weighted_random_walk_exclude_user_pid(graph, start_node, user_pid, num_steps)

    visit_counts = {}
    for node in walk:
        track_id = graph.vs[node]["id"]
        #if user_pid not in map(int, graph.vs[node]["playlists"].split(",")):
        visit_counts[track_id] = visit_counts.get(track_id, 0) + 1

    recommended_tracks = sorted(visit_counts, key=visit_counts.get, reverse=True)
    eligible_tracks = [track for track in recommended_tracks if track != user_track]
    return eligible_tracks[:top_n]

# Step 4: Accuracy testing function
def test_recommendation_accuracy(graph, test_data_path, num_steps=5, top_n=1):
    with open(test_data_path, "r") as f:
        test_data = json.load(f)

    playlists = test_data["playlists"]
    total_tests = 0
    successful_tests = 0

    # Add tqdm progress bar for playlists
    for playlist in tqdm(playlists, desc="Testing playlists"):
        user_pid = playlist["pid"]
        input_track_index = 0
        input_track = playlist["tracks"][input_track_index]["track_uri"]
        ground_truth_tracks = {track["track_uri"] for track in playlist["tracks"]}
        
        try:
            while input_track_index < len(playlist["tracks"]):
                input_track = playlist["tracks"][input_track_index]["track_uri"]
                recommended_tracks = recommend_tracks_by_pid(
                    graph, user_pid, input_track, num_steps, top_n
                )
                logger.info(f"Recommended tracks: {recommended_tracks} for input track: {input_track}")
                if recommended_tracks is None:
                    input_track_index += 1
                    continue
                else:
                    total_tests += 1
                    if any(track in ground_truth_tracks for track in recommended_tracks):
                        successful_tests += 1
                    break
        except ValueError as e:
            print(e)

    accuracy = successful_tests / total_tests if total_tests > 0 else 0
    print(f"\nAccuracy: {accuracy * 100:.2f}% ({successful_tests}/{total_tests})")
    return accuracy

# Step 5: Run the test
test_data_path = "../dataset/data/mpd.slice.999000-999999.json"  # Path to your test JSON file
test_recommendation_accuracy(graph, test_data_path, num_steps=10, top_n=1)


Testing playlists:   0%|          | 0/1000 [00:00<?, ?it/s]

INFO:__main__:Recommended tracks: ['spotify:track:2HUAjeHWA7FQNbnWhlboOL'] for input track: spotify:track:7fnqltLx83HsYLQajzCYRk
INFO:__main__:Recommended tracks: None for input track: spotify:track:2b0ESQ5Usf0yEjkq86xI8p
INFO:__main__:Recommended tracks: ['spotify:track:0tz5hpORwy8PdhsSte7zUL'] for input track: spotify:track:3pfXxHoydFRfD7IBGJTQAN
INFO:__main__:Recommended tracks: None for input track: spotify:track:172rZlTV0yE2La5N19clgq
INFO:__main__:Recommended tracks: None for input track: spotify:track:3OGZfxVRLrJPzrkrR4hYiw
INFO:__main__:Recommended tracks: ['spotify:track:2aksifNn5ph8igDOkPBA02'] for input track: spotify:track:2XMTqoHHSH0lvuXrvIEdco
INFO:__main__:Recommended tracks: None for input track: spotify:track:19EJ7onLbhD1jkgffePLSp
INFO:__main__:Recommended tracks: ['spotify:track:0qcr5FMsEO85NAQjrlDRKo'] for input track: spotify:track:5VIfacsWytkcgr7aTt8Tql
INFO:__main__:Recommended tracks: ['spotify:track:3E2Zh20GDCR9B1EYjfXWyv'] for input track: spotify:track:01XFgR


Accuracy: 3.04% (30/987)


0.030395136778115502

# Node2Vec Random Walk

In [21]:
import igraph as ig
import random
import json

# Step 1: Load the graph from GraphML
# graph = ig.Graph.Read_GraphML("../network-parser/track_graph.graphml")
# graph.to_undirected()  # Ensure the graph is undirected

# random.seed(43)

# Step 2: Define the Node2Vec random walk function
def node2vec_random_walk_exclude_user_pid(graph, start_node, user_pid, num_steps, p=1.0, q=1.0):
    """
    Perform a Node2Vec-style random walk while avoiding tracks in the user's playlist ID.
    
    Args:
        graph (igraph.Graph): The input graph with edge weights.
        start_node (int): The starting node index for the random walk.
        user_pid (int): The user's playlist ID to exclude tracks from.
        num_steps (int): Number of steps for the random walk.
        p (float): Return parameter (controls likelihood of returning to the previous node).
        q (float): In-out parameter (controls exploration of closer vs. farther nodes).
        
    Returns:
        list: A list of visited node indices during the random walk.
    """
    walk = [start_node]
    current_node = start_node
    previous_node = None

    for _ in range(num_steps):
        neighbors = graph.neighbors(current_node, mode="all")

        # Exclude neighbors associated with the user's playlist ID
        valid_neighbors = [
            neighbor for neighbor in neighbors
            #if user_pid not in map(int, graph.vs[neighbor]["playlists"].split(","))
        ]
        if not valid_neighbors:
            break  # Stop if no valid neighbors

        edge_ids = [graph.get_eid(current_node, neighbor) for neighbor in valid_neighbors]
        weights = []

        for neighbor, edge_id in zip(valid_neighbors, edge_ids):
            if previous_node is not None and neighbor == previous_node:
                # Bias for returning to the previous node
                weights.append(graph.es[edge_id]["weight"] / p)
            elif previous_node is not None:
                # Bias for exploring farther or closer nodes
                distance_factor = 1.0 if neighbor in graph.neighbors(previous_node) else q
                weights.append(graph.es[edge_id]["weight"] / distance_factor)
            else:
                # First step, no previous node
                weights.append(graph.es[edge_id]["weight"])

        next_node = random.choices(valid_neighbors, weights=weights, k=1)[0]
        walk.append(next_node)
        previous_node = current_node
        current_node = next_node

    return walk

# Step 3: Recommendation function
def recommend_tracks_by_pid_node2vec(graph, user_pid, user_track, num_steps=5, top_n=1, p=1.0, q=1.0):
    """
    Recommend tracks based on a Node2Vec random walk avoiding tracks in the user's playlist.
    
    Args:
        graph (igraph.Graph): The input graph with edge weights.
        user_pid (int): The user's playlist ID to exclude tracks from.
        user_track (str): The track ID of the starting node.
        num_steps (int): Number of steps for the random walk.
        top_n (int): Number of recommendations to return.
        p (float): Return parameter (controls likelihood of returning to the previous node).
        q (float): In-out parameter (controls exploration of closer vs. farther nodes).
        
    Returns:
        list: A list of recommended track IDs.
    """
    try:
        start_node = graph.vs.find(id=user_track).index
    except ValueError:
        #raise ValueError(f"Track '{user_track}' not found in the graph.")
        return None

    walk = node2vec_random_walk_exclude_user_pid(graph, start_node, user_pid, num_steps, p, q)

    visit_counts = {}
    for node in walk:
        track_id = graph.vs[node]["id"]
        #if user_pid not in map(int, graph.vs[node]["playlists"].split(",")):
        visit_counts[track_id] = visit_counts.get(track_id, 0) + 1

    recommended_tracks = sorted(visit_counts, key=visit_counts.get, reverse=True)
    eligible_tracks = [track for track in recommended_tracks if track != user_track]
    return eligible_tracks[:top_n]

# Step 4: Accuracy testing function
def test_recommendation_accuracy_node2vec(graph, test_data_path, num_steps=5, top_n=1, p=1.0, q=1.0):
    with open(test_data_path, "r") as f:
        test_data = json.load(f)

    playlists = test_data["playlists"]
    total_tests = 0
    successful_tests = 0

    # Add tqdm progress bar for playlists
    for playlist in tqdm(playlists, desc="Testing playlists"):
        user_pid = playlist["pid"]
        input_track_index = 0
        input_track = playlist["tracks"][input_track_index]["track_uri"]
        ground_truth_tracks = {track["track_uri"] for track in playlist["tracks"]}
        
        try:
            while input_track_index < len(playlist["tracks"]):
                input_track = playlist["tracks"][input_track_index]["track_uri"]
                recommended_tracks = recommend_tracks_by_pid_node2vec(
                    graph, user_pid, input_track, num_steps, top_n
                )
                logger.info(f"Recommended tracks: {recommended_tracks} for input track: {input_track}")
                if recommended_tracks is None:
                    input_track_index += 1
                    continue
                else:
                    total_tests += 1
                    if any(track in ground_truth_tracks for track in recommended_tracks):
                        successful_tests += 1
                    break
        except ValueError as e:
            print(e)

    accuracy = successful_tests / total_tests
    print(f"Accuracy: {accuracy * 100:.2f}% ({successful_tests}/{total_tests})")
    return accuracy


# Step 5: Run the test
test_data_path = "../dataset/data/mpd.slice.999000-999999.json"  # Path to your test JSON file
test_recommendation_accuracy_node2vec(graph, test_data_path, p=1.0, q=2.0, num_steps=10, top_n=1)


Testing playlists:   0%|          | 0/1000 [00:00<?, ?it/s]

INFO:__main__:Recommended tracks: ['spotify:track:6l8oNVFkqdG5kDBv0DalCU'] for input track: spotify:track:7fnqltLx83HsYLQajzCYRk
INFO:__main__:Recommended tracks: None for input track: spotify:track:2b0ESQ5Usf0yEjkq86xI8p
INFO:__main__:Recommended tracks: ['spotify:track:0wpbWr0vI9mIV6WpO5CqPi'] for input track: spotify:track:3pfXxHoydFRfD7IBGJTQAN
INFO:__main__:Recommended tracks: None for input track: spotify:track:172rZlTV0yE2La5N19clgq
INFO:__main__:Recommended tracks: None for input track: spotify:track:3OGZfxVRLrJPzrkrR4hYiw
INFO:__main__:Recommended tracks: ['spotify:track:0zMzyHAeMvwq5CRstru1Fp'] for input track: spotify:track:2XMTqoHHSH0lvuXrvIEdco
INFO:__main__:Recommended tracks: None for input track: spotify:track:19EJ7onLbhD1jkgffePLSp
INFO:__main__:Recommended tracks: ['spotify:track:7jYo1uYSMzQS0nlN6bRAKM'] for input track: spotify:track:5VIfacsWytkcgr7aTt8Tql
INFO:__main__:Recommended tracks: ['spotify:track:2QQuKHdhPoAJchgY0HPw96'] for input track: spotify:track:01XFgR

Accuracy: 2.03% (20/987)


0.020263424518743668

# Restart Random Walk

In [22]:
import igraph as ig
import random
import json

# Step 1: Load the graph from GraphML
# graph = ig.Graph.Read_GraphML("../network-parser/track_graph.graphml")
# graph.to_undirected()  # Ensure the graph is undirected

# random.seed(44)

# Step 2: Define the Restart Random Walk function
def restart_random_walk_exclude_user_pid(graph, start_node, user_pid, num_steps, restart_prob=0.15):
    """
    Perform a restart random walk while avoiding tracks in the user's playlist ID.
    
    Args:
        graph (igraph.Graph): The input graph with edge weights.
        start_node (int): The starting node index for the random walk.
        user_pid (int): The user's playlist ID to exclude tracks from.
        num_steps (int): Number of steps for the random walk.
        restart_prob (float): Probability of restarting the walk at the starting node.
        
    Returns:
        list: A list of visited node indices during the random walk.
    """
    walk = [start_node]
    current_node = start_node

    for _ in range(num_steps):
        if random.random() < restart_prob:
            # Restart to the starting node
            current_node = start_node
        else:
            neighbors = graph.neighbors(current_node, mode="all")

            # Exclude neighbors associated with the user's playlist ID
            valid_neighbors = [
                neighbor for neighbor in neighbors
                #if user_pid not in map(int, graph.vs[neighbor]["playlists"].split(","))
            ]
            if not valid_neighbors:
                break  # Stop if no valid neighbors

            # Get edge weights for valid neighbors
            edge_ids = [graph.get_eid(current_node, neighbor) for neighbor in valid_neighbors]
            weights = [graph.es[edge_id]["weight"] for edge_id in edge_ids]

            current_node = random.choices(valid_neighbors, weights=weights, k=1)[0]

        walk.append(current_node)

    return walk

# Step 3: Recommendation function
def recommend_tracks_by_pid_restart(graph, user_pid, user_track, num_steps=5, top_n=1, restart_prob=0.15):
    """
    Recommend tracks based on a restart random walk avoiding tracks in the user's playlist.
    
    Args:
        graph (igraph.Graph): The input graph with edge weights.
        user_pid (int): The user's playlist ID to exclude tracks from.
        user_track (str): The track ID of the starting node.
        num_steps (int): Number of steps for the random walk.
        top_n (int): Number of recommendations to return.
        restart_prob (float): Probability of restarting the walk at the starting node.
        
    Returns:
        list: A list of recommended track IDs.
    """
    try:
        start_node = graph.vs.find(id=user_track).index
    except ValueError:
        #raise ValueError(f"Track '{user_track}' not found in the graph.")
        return None

    walk = restart_random_walk_exclude_user_pid(graph, start_node, user_pid, num_steps, restart_prob)

    visit_counts = {}
    for node in walk:
        track_id = graph.vs[node]["id"]
        #if user_pid not in map(int, graph.vs[node]["playlists"].split(",")):
        visit_counts[track_id] = visit_counts.get(track_id, 0) + 1

    recommended_tracks = sorted(visit_counts, key=visit_counts.get, reverse=True)
    eligible_tracks = [track for track in recommended_tracks if track != user_track]
    return eligible_tracks[:top_n]

# Step 4: Accuracy testing function
def test_recommendation_accuracy_restart(graph, test_data_path, num_steps=5, top_n=1, restart_prob=0.15):
    with open(test_data_path, "r") as f:
        test_data = json.load(f)

    playlists = test_data["playlists"]
    total_tests = 0
    successful_tests = 0

    for playlist in tqdm(playlists, desc="Testing playlists"):
        user_pid = playlist["pid"]
        input_track_index = 0
        input_track = playlist["tracks"][input_track_index]["track_uri"]  # First track in the playlist
        ground_truth_tracks = {track["track_uri"] for track in playlist["tracks"]}
        
        try:
            while input_track_index < len(playlist["tracks"]):
                input_track = playlist["tracks"][input_track_index]["track_uri"]
                recommended_tracks = recommend_tracks_by_pid_restart(
                    graph, user_pid, input_track, num_steps, top_n, restart_prob
                )
                if recommended_tracks is None:
                    input_track_index += 1
                    continue
                else:
                    total_tests += 1
                    # Check if any recommended track is in the ground truth tracks
                    if any(track in ground_truth_tracks for track in recommended_tracks):
                        successful_tests += 1
                    break
        except ValueError as e:
            print(e)  # Handle cases where the track is not found

    accuracy = successful_tests / total_tests
    print(f"Accuracy: {accuracy * 100:.2f}% ({successful_tests}/{total_tests})")
    return accuracy


# Step 5: Run the test
test_data_path = "../dataset/data/mpd.slice.999000-999999.json"  # Path to your test JSON file
test_recommendation_accuracy_restart(graph, test_data_path, restart_prob=0.15, num_steps=50, top_n=1)


Testing playlists:   0%|          | 0/1000 [00:00<?, ?it/s]

Accuracy: 3.04% (30/987)


0.030395136778115502

# Community Detection

In [24]:
import igraph as ig

# Step 1: Load the graph and ensure it's undirected
# graph = ig.Graph.Read_GraphML("../network-parser/track_graph.graphml")
# graph.to_undirected()

# Step 2: Perform community detection
communities = graph.community_multilevel()

# Step 3: Compute node degrees
degrees = graph.degree()

# Step 4: Get top nodes in each community
top_nodes_per_community = {}
for i, community in enumerate(communities):
    # Rank nodes by degree within the community
    ranked_nodes = sorted(community, key=lambda node: degrees[node], reverse=True)
    # Get top 5 nodes
    top_nodes_per_community[i] = ranked_nodes[:5]

# Step 5: Print results
print(f"Number of communities detected: {len(communities)}")
for community_id, top_nodes in top_nodes_per_community.items():
    print(f"\nTop nodes in Community {community_id}:")
    for node in top_nodes:
        print(f"  Node {graph.vs[node]['id']} with degree {degrees[node]}")


Number of communities detected: 72

Top nodes in Community 0:
  Node spotify:track:1snNAXmmPXCn0dkF9DaPWw with degree 2340
  Node spotify:track:0dPBlz8N0veeceejtuvpPz with degree 1949
  Node spotify:track:6ZT3coOj97F6CVvruPtnox with degree 1924
  Node spotify:track:0k93MXOj0kSXo84SvSDeUz with degree 1780
  Node spotify:track:6vjWim6xU6tPjA42GxSp1U with degree 1673

Top nodes in Community 1:
  Node spotify:track:2GiJYvgVaD2HtM8GqD9EgQ with degree 2950
  Node spotify:track:3ZMv9EzGoteNi5Qnx0KpEO with degree 2800
  Node spotify:track:4llK75pXNWZz6KAho2Gp16 with degree 2751
  Node spotify:track:3n69hLUdIsSa1WlRmjMZlW with degree 2597
  Node spotify:track:13HVjjWUZFaWilh2QUJKsP with degree 2573

Top nodes in Community 2:
  Node spotify:track:1tMyqN7bNCokdg7jWgKPc8 with degree 115
  Node spotify:track:1y58wFuYYnVsCnY8DsOELY with degree 115
  Node spotify:track:5ib3EGG06XUnUf7hzDnheL with degree 115
  Node spotify:track:5zLvo9lek82KN9QKRpJojD with degree 82
  Node spotify:track:7tor1N7ix61iCk

# Community based random walk

In [26]:
# specify test data
test_data_path = "../dataset/data/mpd.slice.999000-999999.json"

# community detection
communities = graph.community_multilevel()

# convert communities to induced subgraph
subgraphs = [graph.induced_subgraph(community) for community in communities]

# combine subgraphs using disjoint_union
combined_graph = subgraphs[0]  # Start with first subgraph
for sg in subgraphs[1:]:
    combined_graph = combined_graph.disjoint_union(sg)

# test recommendation accuracy
test_recommendation_accuracy_restart(combined_graph, test_data_path, restart_prob=0.15, num_steps=50, top_n=1)

Testing playlists:   0%|          | 0/1000 [00:00<?, ?it/s]

Accuracy: 3.95% (39/987)


0.03951367781155015