###Make spotipy work

In [1]:
%%capture
!pip install spotipy
import networkx as nx
import pandas as pd
import spotipy
from spotipy . oauth2 import SpotifyClientCredentials
import matplotlib.pyplot as plt

In [2]:
CLIENT_ID = "fe233373eb024c3a97f1eeca78dba3c8"
CLIENT_SECRET = "34f0386560eb4dcb88e317f81b11b961"

auth_manager = SpotifyClientCredentials (client_id = CLIENT_ID, client_secret = CLIENT_SECRET)
sp = spotipy . Spotify ( auth_manager = auth_manager )

###Main

In [154]:
import networkx as nx
import pandas as pd
import copy
import numpy as np
import itertools
from numpy import dot
from numpy.linalg import norm

# ------- IMPLEMENT HERE ANY AUXILIARY FUNCTIONS NEEDED ------- #


# --------------- END OF AUXILIARY FUNCTIONS ------------------ #

def retrieve_bidirectional_edges(g: nx.DiGraph, out_filename: str) -> nx.Graph:
    """
    Convert a directed graph into an undirected graph by considering bidirectional edges only.

    :param g: a networkx digraph.
    :param out_filename: name of the file that will be saved.
    :return: a networkx undirected graph.
    """
    # ------- IMPLEMENT HERE THE BODY OF THE FUNCTION ------- #
    out = nx.Graph()
    out.add_nodes_from(g)

    for edge in g.edges():  
      if edge[::-1] in g.edges(): #check if reverse edge exist in graph
        out.add_edge(edge[0],edge[1])
    
    out.remove_nodes_from(list(nx.isolates(out))) #Remove nodes without edges

    nx.write_graphml(out, out_filename+".graphml")
    return out
    # ----------------- END OF FUNCTION --------------------- #


def prune_low_degree_nodes(g: nx.Graph, min_degree: int, out_filename: str) -> nx.Graph:
    """
    Prune a graph by removing nodes with degree < min_degree.

    :param g: a networkx graph.
    :param min_degree: lower bound value for the degree.
    :param out_filename: name of the file that will be saved.
    :return: a pruned networkx graph.
    """
    # ------- IMPLEMENT HERE THE BODY OF THE FUNCTION ------- #
    out = nx.Graph()
    out.add_nodes_from(g)
    out.add_edges_from(list(g.edges()))
    
    out.remove_nodes_from([node for node,degree in dict(out.degree()).items() if degree < min_degree])    
    
    out.remove_nodes_from(list(nx.isolates(out))) #Remove nodes without edges

    nx.write_graphml(out, out_filename+".graphml")    
    return out
    # ----------------- END OF FUNCTION --------------------- #


def prune_low_weight_edges(g: nx.Graph, min_weight=None, min_percentile=None, out_filename: str = None) -> nx.Graph:
    """
    Prune a graph by removing edges with weight < threshold. Threshold can be specified as a value or as a percentile.

    :param g: a weighted networkx graph.
    :param min_weight: lower bound value for the weight.
    :param min_percentile: lower bound percentile for the weight.
    :param out_filename: name of the file that will be saved.
    :return: a pruned networkx graph.
    """
    # ------- IMPLEMENT HERE THE BODY OF THE FUNCTION ------- #
    if (min_weight==None and min_percentile==None) or not (min_weight==None or min_percentile==None):
      raise Exception('Use only one parameter: min_weight or min_percentile')
    
    out = copy.deepcopy(g)  

    if min_weight == None:
      min_weight = np.percentile(np.sort(np.array([e[2] for e in out.edges.data('weight')])), min_percentile) 

    out.remove_edges_from(list(filter(lambda e: e[2] < min_weight, (e for e in out.edges.data('weight')))))
    
    out.remove_nodes_from(list(nx.isolates(out))) #Remove nodes without edges

    nx.write_graphml(out, out_filename+".graphml")    
    return out
    # ----------------- END OF FUNCTION --------------------- #


def compute_mean_audio_features(tracks_df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute the mean audio features for tracks of the same artist.

    :param tracks_df: tracks dataframe (with audio features per each track).
    :return: artist dataframe (with mean audio features per each artist).
    """
    # ------- IMPLEMENT HERE THE BODY OF THE FUNCTION ------- #
    columns = ['Name', 'ID', 'Danceability', 'Energy', 'Loudness', 'Speechiness', 'Acousticness', 'Instrumentalness', 'Liveness', 'Valence', 'Tempo']
    keep = {}
    total = []

    for index, row in tracks_df.iterrows():
      key = row['Artist Name']+','+row['Artist ID']
      if key not in keep.keys():
          keep[key] = [[row['Danceability'], row['Energy'], row['Loudness'], row['Speechiness'], row['Acousticness'], row['Instrumentalness'], row['Liveness'], row['Valence'], row['Tempo']]]
      else:
          keep[key].append([row['Danceability'], row['Energy'], row['Loudness'], row['Speechiness'], row['Acousticness'], row['Instrumentalness'], row['Liveness'], row['Valence'], row['Tempo']])

    for key in keep.keys():
      name, id = key.split(',')
      info = [name, id] + ((np.array(keep[key]).sum(axis=0))/len(keep[key])).tolist()
      total.append(info)
    
    table = pd.DataFrame(total, columns=columns)
    return table
    # ----------------- END OF FUNCTION --------------------- #


def create_similarity_graph(artist_audio_features_df: pd.DataFrame, similarity: str, out_filename: str = None) -> \
        nx.Graph:
    """
    Create a similarity graph from a dataframe with mean audio features per artist.

    :param artist_audio_features_df: dataframe with mean audio features per artist.
    :param similarity: the name of the similarity metric to use (e.g. "cosine" or "euclidean").
    :param out_filename: name of the file that will be saved.
    :return: a networkx graph with the similarity between artists as edge weights.
    """
    # ------- IMPLEMENT HERE THE BODY OF THE FUNCTION ------- #
    complete = nx.Graph()

    for index, row in artist_audio_features_df.iterrows():
      complete.add_node(row['Name'], 
                    id = row['ID'], 
                    features = [row['Danceability'], row['Energy'], row['Loudness'], row['Speechiness'], row['Acousticness'], row['Instrumentalness'], row['Liveness'], row['Valence'], row['Tempo']])

    complete.add_edges_from(itertools.combinations(complete, 2))

    for ins,out,weight in complete.edges(data=True):
      features_in = np.array(dict(complete.nodes())[ins]['features'])  
      features_out = np.array(dict(complete.nodes())[out]['features']) 

      if similarity=='euclidean':
        dist = np.linalg.norm(features_in-features_out)
      elif similarity=='cosine':
        dist = dot(features_in, features_out)/(norm(features_in)*norm(features_out))
      
      weight['weight'] = dist 
    
    for node in list(complete.nodes()):
      del dict(complete.nodes())[node]['features']

    nx.write_graphml(complete, out_filename+".graphml")
    return complete  
    # ----------------- END OF FUNCTION --------------------- #


if __name__ == "__main__":
    # ------- IMPLEMENT HERE THE MAIN FOR THIS SESSION ------- #
    pass
    # ------------------- END OF MAIN ------------------------ #


###Work zone

In [157]:
gB = nx.read_graphml('gB.graphml')
gD = nx.read_graphml('gD.graphml')
D =  pd.read_csv("D.csv")

In [158]:
#Exercise 1
gB_bi = retrieve_bidirectional_edges(gB, 'gB_bi')
gD_bi = retrieve_bidirectional_edges(gD, 'gD_bi')

In [197]:
#Exercise 2
mean_audio = compute_mean_audio_features(D) #We would need separated datasets? One for gB and one for gD?
similarity_graph = create_similarity_graph(mean_audio, 'euclidean', 'similarity_graph')
similarity_graph_gB = prune_low_weight_edges(similarity_graph, min_weight=28.5, out_filename='similarity_graph')
similarity_graph_gD = prune_low_weight_edges(similarity_graph, min_weight=24, out_filename='similarity_graph')

In [199]:
gB_bi.size() - similarity_graph_gB.size()

18

In [200]:
gD_bi.size() - similarity_graph_gD.size()

-22