In [None]:
import json
import heapq
import networkx as nx

In [None]:
#file_path = 'challenge_set.json'
file_path = 'mpd.slice.0-999.json'

#Read the data.
if file_path == 'challenge_set.json':
  with open(file_path, 'r') as file:
      data = json.load(file)
      del data['date']
      del data['version']
      #del data['name']
      del data['description']
elif file_path == 'mpd.slice.0-999.json':
   with open(file_path, 'r') as file:
      data = json.load(file)
      del data['info']



all_playlists = data['playlists'] #Extract the playlists.



non_empty_playlists = [] # PLaylsits with moer than 3 tracks.

tracks = set() #Track nodes
albums = set() #Album nodes
artists = set() #Artist nodes
playlists = set() #Playlists nodes
TP_edges = set() #Track-Playlist edges
TA_edges = set() #Track-Artist edges
TM_edges = set() #TRack-Album edges

# Take the playlists with more than 3 songs.
for current_playlist in all_playlists:
    if len(current_playlist['tracks']) >= 3:
        if 'name' in current_playlist:
          del current_playlist['name']
        for track in current_playlist['tracks']:
          del track['pos']
        non_empty_playlists.append(current_playlist)


#Take the distinct nodes and edges for all type of entities.
for current_playlist in non_empty_playlists:
    playlists.add(current_playlist['pid'])
    for track in current_playlist['tracks']:
      tracks.add(track['track_uri'])
      artists.add(track['artist_uri'])
      albums.add(track['album_uri'])
      TP_edges.add((current_playlist['pid'],track['track_uri']))
      TM_edges.add((track['album_uri'],track['track_uri']))
      TA_edges.add((track['artist_uri'],track['track_uri']))


print(f'Total playlists: {len(all_playlists)}')
print(f'Tracks: {len(tracks)}')
print(f'Albums: {len(albums)}')
print(f'Artists: {len(artists)}')
print(f'Playlists: {len(playlists)}')
print(f'Edges: {len(TP_edges) + len(TM_edges) + len(TA_edges)}')

Total playlists: 1000
Tracks: 34443
Albums: 19261
Artists: 9754
Playlists: 1000
Edges: 135607


In [None]:
het_graph = nx.Graph()
TP_graph = nx.Graph()
TA_graph = nx.Graph()
TM_graph = nx.Graph()

# Heterogeneous Threepartite Graph
het_graph.add_nodes_from(tracks,bipartite=0)
het_graph.add_nodes_from(playlists,bipartite=1)
het_graph.add_nodes_from(albums,bipartite=2)
het_graph.add_nodes_from(artists,bipartite=3)
het_graph.add_edges_from(TP_edges|TA_edges|TM_edges)

# TP Bipartite Graph
TP_graph.add_nodes_from(tracks,bipartite=0)
TP_graph.add_nodes_from(playlists,bipartite=1)
TP_graph.add_edges_from(TP_edges)

# TA Bipartite Graph
TA_graph.add_nodes_from(tracks,bipartite=0)
TA_graph.add_nodes_from(artists,bipartite=1)
TA_graph.add_edges_from(TA_edges)

# TM Bipartite Graph
TM_graph.add_nodes_from(tracks,bipartite=0)
TM_graph.add_nodes_from(albums,bipartite=1)
TM_graph.add_edges_from(TM_edges)

# Projection of het_graph to get graph with tracks only.
T_graph = nx.projected_graph(het_graph, nodes=tracks)


In [None]:
if nx.bipartite.is_bipartite(het_graph):
  print('true')
else :print('false')

if nx.bipartite.is_bipartite(TP_graph):
  print('true')
else :print('false')

if nx.bipartite.is_bipartite(TA_graph):
  print('true')
else :print('false')

if nx.bipartite.is_bipartite(TM_graph):
  print('true')
else :print('false')

true
true
true
true


In [None]:
#Common playlists factor.
def samePlay(t1,t2):
  t1pl = set(TP_graph.neighbors(t1))
  t2pl = set(TP_graph.neighbors(t2))
  return len(t1pl & t2pl)

#Common artists factor.
def sameArtist(t1,t2):
  t1art = set(TA_graph.neighbors(t1))
  t2art = set(TA_graph.neighbors(t2))
  return len(t1art & t2art)

#Common album factor.
def sameAlbum(t1,t2):
  t1al = set(TM_graph.neighbors(t1))
  t2al = set(TM_graph.neighbors(t2))
  return len(t1al & t2al)

T_edges = T_graph.edges()

#Assign weight to edges.
for edge in T_edges:
    edge_weight = samePlay(edge[0],edge[1]) + sameArtist(edge[0],edge[1]) + sameAlbum(edge[0],edge[1])
    T_graph.add_edge(edge[0],edge[1], weight=edge_weight)


In [None]:
#Precision function.
def precision(top_k,secondHalf):
  top_k = set(top_k)
  secondHalf = set(secondHalf)
  return len(top_k & secondHalf)/len(secondHalf)

In [None]:
test_playlists = non_empty_playlists[900:] # Predict the half last 100 playlists.
avgpr = 0 #Average precision.
for i,playlist in enumerate(test_playlists):
  firstHalf = [track['track_uri'] for track in playlist['tracks'][:len(playlist['tracks']) // 2]]
  secondHalf = [track['track_uri'] for track in playlist['tracks'][len(playlist['tracks']) // 2:]]
  personalization_vector = {track : 1.0 for track in firstHalf} #User preference
  personalized_pagerank = nx.pagerank(T_graph, personalization=personalization_vector, weight='weight')
  max_heap = [(score, node) for node, score in personalized_pagerank.items()]
  heapq.heapify(max_heap) #Heap to extract the top_K ranks.
  k = len(secondHalf)
  top_k = heapq.nlargest(k, max_heap)
  top_k_tracks = [t[1] for t in top_k]
  avgpr += precision(top_k_tracks,secondHalf)

avgpr = avgpr/(i+1)
print(avgpr)

0.03280495757394636
