# Rankify
## Analysis of a second-degree egocentric network created from a Spotify playlist using the PageRank algorithm
### Group Project - Group 7

**Objective:**
To study centrality measures and/or communities (or other parameters of interest) in a graph contextualized as a small-world network. The example must reflect the behavior of a real network.

**Reference:**
P. Mohan, P. Narayan and L. Sharma, "Egocentric Analysis of GitHub User Network," 2021.

## 1. Libraries and Authentication
We use `spotipy` to interact with the Spotify API and `networkx` to handle graph export operations, while using SageMath for the main graph object.

In [1]:
import spotipy
import networkx as nx
from spotipy.oauth2 import SpotifyClientCredentials

# --- CONFIGURATION ---
# Replace with your own credentials from Spotify for Developers
CLIENT_ID = 'YOUR_CLIENT_ID'
CLIENT_SECRET = 'YOUR_CLIENT_SECRET'

auth_manager = SpotifyClientCredentials(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
sp = spotipy.Spotify(auth_manager=auth_manager)

print("Authentication successful.")

Authentication successful.


## 2. Data Extraction
We fetch the playlist metadata and extract all unique artists. We store their Spotify IDs to query their top tracks later.

In [None]:
def get_playlist_data(playlist_url):
    """
    Retrieves the playlist name and a dictionary of unique artists {Name: ID}.
    """
    print("Connecting to Spotify...")
    
    # 1. Get full playlist object (Metadata + first 100 tracks)
    playlist_data = sp.playlist(playlist_url)
    playlist_name = playlist_data['name']
    print(f"Playlist found: '{playlist_name}'")
    
    # 2. Handle Pagination (if playlist > 100 songs)
    results = playlist_data['tracks']
    all_tracks = results['items']
    
    while results['next']:
        results = sp.next(results)
        all_tracks.extend(results['items'])
        
    print(f"Processing {len(all_tracks)} tracks...")
    
    # 3. Extract Artists (Name and ID)
    artist_map = {} 
    for item in all_tracks:
        # Safety check for local files or removed songs
        if item is not None and item.get('track') is not None:
            track = item['track']
            for artist in track['artists']:
                artist_map[artist['name']] = artist['id']
                
    return playlist_name, artist_map

# --- EXECUTION ---
url = 'YOUR_PLAYLIST_URL_HERE'
playlist_name, original_artist_map = get_playlist_data(url)

print(f"Unique artists found: {len(original_artist_map)}")

## 3. Network Expansion (Egocentric Network)
We build the graph by:
1. Creating a central node (The Playlist).
2. Connecting original artists to the center.
3. Fetching "Top Tracks" for each artist to find collaborations.
4. Connecting artists based on these collaborations (discovering new nodes).

In [None]:
def create_egocentric_network(playlist_name, artist_map, verbosity=1):
    """
    Generates an expanded egocentric graph.
    verbosity: 
        0 = Silent.
        1 = General progress.
        2 = Detailed (API calls and connections).
    """
    G = Graph()
    
    # Lists to store edge types for coloring
    star_edges = []     # Playlist -> Artist
    internal_edges = [] # Artist A <-> Artist B (Both in playlist)
    external_edges = [] # Artist A -> Artist C (New discovery)
    
    original_names = set(artist_map.keys())
    total_artists = len(artist_map)
    
    if verbosity > 0:
        print(f"--- STARTING PROCESS ---")
        print(f"Target: Expand network for '{playlist_name}'")
        print(f"Seed nodes: {total_artists} artists.")
        print("-" * 30)

    # --- PHASE 1: Star Edges (Playlist -> Artist) ---
    for artist_name in artist_map.keys():
        star_edges.append((playlist_name, artist_name))
    
    G.add_edges(star_edges)

    # --- PHASE 2: Find Collaborations (Top Tracks) ---
    if verbosity > 0:
        print(f"[PHASE 2] Querying Spotify API for collaborations...")
        
    count = 0
    new_nodes_count = 0
    
    for origin_name, origin_id in artist_map.items():
        try:
            # API Call: Get Top Tracks (Market: ES/US)
            top_tracks = sp.artist_top_tracks(origin_id, country='ES')
            tracks_list = top_tracks['tracks']
            
            if verbosity >= 2:
                 print(f"   > Analyzing {len(tracks_list)} tracks for: '{origin_name}'")

            for track in tracks_list:
                track_artists = track['artists']
                
                # If more than 1 artist, we have a collaboration
                if len(track_artists) > 1:
                    for colab_artist in track_artists:
                        colab_name = colab_artist['name']
                        
                        # Avoid self-loops
                        if colab_name == origin_name:
                            continue
                        
                        new_edge = (origin_name, colab_name)
                        
                        # CLASSIFY EDGE
                        if colab_name in original_names:
                            # Internal Collaboration
                            if not G.has_edge(new_edge):
                                internal_edges.append(new_edge)
                        else:
                            # External Collaboration (New Node)
                            if colab_name not in G.vertices():
                                new_nodes_count += 1    
                            external_edges.append(new_edge)
                            
                        G.add_edge(new_edge)
            
            count += 1
            if verbosity == 1 and count % 10 == 0:
                print(f"Progress... {count}/{total_artists} artists processed.")
                
        except Exception as e:
            print(f"!!! Error with artist '{origin_name}': {e}")
            continue

    if verbosity > 0:
        print("-" * 30)
        print(f"--- FINISHED ---")
        print(f"Total Final Nodes: {G.order()}")
        print(f"Total Edges: {G.size()}")
        print(f"   - Internal (between originals): {len(internal_edges)}")
        print(f"   - External (new discoveries): {len(external_edges)}")

    return G, star_edges, internal_edges, external_edges

# --- EXECUTION ---
G, edges_star, edges_in, edges_out = create_egocentric_network(playlist_name, original_artist_map, verbosity=1)

## 4. Visualization
We visualize the network using SageMath's plot function.
* **Red Node:** The Playlist (Center).
* **Blue Nodes:** Original Artists.
* **Gray Nodes:** Discovered Artists.

In [None]:
# Define node colors
all_nodes = G.vertices()
playlist_node = [playlist_name]
original_nodes = list(original_artist_map.keys())
# New nodes = All - (Playlist + Originals)
new_nodes = list(set(all_nodes) - set(original_nodes) - set(playlist_node))

node_colors = {
    'red': playlist_node,
    'blue': original_nodes,
    'lightgray': new_nodes
}

# Define edge colors
edge_colors = {
    '#ff0000': edges_star,  # Red
    '#00ff00': edges_in,    # Green
    '#d3d3d3': edges_out    # Light Gray
}

# Plot
G.plot(
    layout='spring',
    iterations=50,
    vertex_size=50,
    vertex_labels=False, # Set to True to see names
    vertex_colors=node_colors,
    edge_colors=edge_colors,
    figsize=[12, 12],
    title=f"Network of: {playlist_name}"
).show()

## 5. Export to Gephi
We export the graph to GraphML format. We convert SageMath numerical types to standard Python `floats` to avoid compatibility errors.

In [None]:
def export_graph_to_gephi(G_sage, playlist_name, artist_map, filename="spotify_network.graphml", calculate_centrality=True):
    """
    Exports the SageMath graph to GraphML compatible with Gephi.
    Handles type conversion (Sage RealLiteral -> Python float).
    """
    print(f"--- Exporting to {filename} ---")
    
    NX_G = G_sage.networkx_graph()
    
    # Calculate Centrality
    if calculate_centrality:
        print("Calculating Betweenness Centrality...")
        bb = nx.betweenness_centrality(NX_G)
        # Fix: Cast Sage numbers to python floats
        bb_python = {k: float(v) for k, v in bb.items()}
        nx.set_node_attributes(NX_G, bb_python, 'centrality')

    # Assign Visual Attributes
    print("Assigning attributes...")
    for node in NX_G.nodes():
        attrs = {}
        
        # 1. SOURCE (Playlist)
        if node == playlist_name:
            attrs = {
                'type': 'Source',
                'viz_color': '#ff0000', 
                'size': float(50.0),
                'label': str(node)
            }
        # 2. ORIGINAL
        elif node in artist_map:
            attrs = {
                'type': 'Original',
                'viz_color': '#0000ff', 
                'size': float(30.0),
                'label': str(node)
            }
        # 3. DISCOVERY
        else:
            attrs = {
                'type': 'Discovery',
                'viz_color': '#a9a9a9', 
                'size': float(10.0),
                'label': str(node)
            }
            
        NX_G.nodes[node].update(attrs)

    try:
        nx.write_graphml(NX_G, filename)
        print(f"✅ Success! File saved: {filename}")
    except Exception as e:
        print(f"❌ Error saving file: {e}")

# --- EXECUTION ---
export_graph_to_gephi(G, playlist_name, original_artist_map, "rankify_export.graphml")

## 6. Recommendation System (PageRank)
We apply the PageRank algorithm to find the most important "discovered" artists based on the network structure. We prioritize personalized PageRank to reflect the user's taste.

In [None]:
def recommend_new_artists(G_sage, playlist_name, original_artist_map, use_personalized=True):
    """
    Calculates PageRank and filters to show only discovered artists.
    """
    NX_G = G_sage.networkx_graph()
    
    known_nodes = set(original_artist_map.keys())
    known_nodes.add(playlist_name)
    
    print(f"Calculating ranking on {NX_G.number_of_nodes()} nodes...")

    if use_personalized:
        print("--- Mode: Personalized PageRank (Focused on your input) ---")
        # Weight 1 for known nodes, 0 for unknown
        personalization = {node: 1 if node in known_nodes else 0 for node in NX_G.nodes()}
        scores = nx.pagerank(NX_G, alpha=0.85, personalod=personalization)
    else:
        print("--- Mode: Global PageRank ---")
        scores = nx.pagerank(NX_G, alpha=0.85)

    # Filter and Sort
    new_artist_ranking = []
    for artist, score in scores.items():
        if artist not in known_nodes:
            new_artist_ranking.append((artist, score))
    
    new_artist_ranking.sort(key=lambda x: x[1], reverse=True)
    
    return new_artist_ranking

# --- EXECUTION ---
top_discoveries = recommend_new_artists(G, playlist_name, original_artist_map, use_personalized=True)

# --- RESULTS ---
print("\n" + "="*40)
print(f"TOP 10 RECOMMENDED ARTISTS")
print("="*40)

for i, (artist, score) in enumerate(top_discoveries[:10]):
    # Fix: Cast score to float for formatting
    print(f"{i+1}. {artist} (Score: {float(score):.5f})")

if len(top_discoveries) == 0:
    print("No new artists found.")