# MuSo Data
MuSo is a supervised machine learning model for predicting user compatibility (degrees of separation) based on their music preferences. This document handles the dataset creation and serialization.

## Initialization

In [1]:
import json
import random
import pylast
import pandas as pd
import numpy as np
from pathlib import Path
from tqdm import tqdm
from pprint import pprint
from skimage.morphology import dilation

RANDOM = False
SEED = 42

if not RANDOM:
    random.seed(SEED)

def load_credentials_json_from_file():
    primary_path = Path("Credentials.json")
    fallback_path = Path("LastFM_Credentials.json")
    if primary_path.exists():
        file_path = primary_path
    else:
        file_path = fallback_path
    with file_path.open('r') as file:
        data = json.load(file)
    if not all(list(data.values())):
        raise ValueError(f"The file '{file_path}' MUST have all values defined.")
    return data

credentials = load_credentials_json_from_file()

try:
    lastfm = pylast.LastFMNetwork(
        api_key=credentials["LASTFM_API_KEY"],
        api_secret=credentials["LASTFM_API_SECRET"],
        username=credentials["LASTFM_USERNAME"],
        password_hash=pylast.md5(credentials["LASTFM_PASSWORD"]),
    )
    print("Connected to LastFM successfully.")
except Exception as e:
    print(f"Failed to connect to LastFM: {str(e)}")

Connected to LastFM successfully.


## Dataset Creation

### User Node Chain

In [2]:
def get_friends_safely(user:pylast.User, limit:int=10):
    """Safely get friends from Last.fm, handling any exceptions."""
    try:
        return list(user.get_friends(limit=limit))
    except Exception as e:
        return []

USE_SAVED_UNC = False
if not USE_SAVED_UNC:
    ROOT_USER_NODE = "xxMYDLSASTERxx"
    MAX_DOS = 100 # Maximum possible degrees of separation
    DUNC_LIM = 300 # Maximum number of friends to request per call in discover_user_node_chain
    
    def discover_user_node_chain(root_user_node:str, max_dos:int=6, limit:int=20):
        user_node_chain = [lastfm.get_user(root_user_node)]
        while len(user_node_chain) <= max_dos:
            friends = get_friends_safely(user_node_chain[-1], limit=limit)
            new_friends = [f for f in friends if f not in user_node_chain]
            if new_friends:
                user_node_chain.append(random.choice(new_friends))
            else:
                print(f"User: {user_node_chain[-1]} has no new friends to continue chain.")
                user_node_chain = user_node_chain[:-1]
        return user_node_chain

    UNC = discover_user_node_chain(ROOT_USER_NODE, MAX_DOS, DUNC_LIM)
    for u in UNC:
        print(u)

else:
    saved_unc = ['xxMYDLSASTERxx','NuMetalFan69','ptveli','cemeteryvamp','Dying___Atheist','don_guraleska','antropogeniczna','orligentia','TainaraBorgir','paulot8','hatsukoi07','Sanne_E','Piotr_GsG','DrkZero','stonehopper1067','synthgal','z-la,lafayet','Dushead','VilEffigy','1stance','flofloozy','ilselo','lorrabbit','acidicmoons','Krappa322','xhelock','ReptiliePangare','Warlee','xowx']
    UNC = []
    for user_str in saved_unc:
        UNC.append(lastfm.get_user(user_str))
UNC

### Degrees of Separation

In [3]:
def get_dos_array(user_node_chain:list[pylast.User], limit=1000):
    unc_len = len(user_node_chain)
    dos_array = np.zeros((unc_len, unc_len), dtype=np.int64)
    for j in tqdm(range(unc_len)):
        friends = get_friends_safely(user_node_chain[j], limit=limit)
        for i in range(unc_len):
            if i == j - 1:
                dos_array[i, j] = 1
            elif i < j - 1:
                dos_array[i, j] = user_node_chain[i] in friends

    dos_array += dos_array.T
    
    cross = np.ones((3, 3), dtype=np.int64)
    cross[0, 0] = 0
    cross[0, 2] = 0
    cross[2, 0] = 0
    cross[2, 2] = 0
    
    last_dilated = dos_array
    n = 2
    while last_dilated.sum() != last_dilated.size:
        dilated = dilation(last_dilated, cross)
        dos_array += (dilated - last_dilated) * n
        last_dilated = dilated
        n += 1

    np.fill_diagonal(dos_array, 0)
    return dos_array


DOS_ARR = get_dos_array(UNC)
UNC_UNS = [u.get_name() for u in UNC]
DOS_DF = pd.DataFrame(DOS_ARR, columns=UNC_UNS, index=UNC_UNS)

dos_df_styled = DOS_DF.style.background_gradient(
    cmap='plasma_r',
    vmin=DOS_DF.min().min(),
    vmax=DOS_DF.max().max(),
)
dos_df_styled

100%|██████████| 29/29 [01:23<00:00,  2.87s/it]


Unnamed: 0,xxMYDLSASTERxx,NuMetalFan69,ptveli,cemeteryvamp,Dying___Atheist,don_guraleska,antropogeniczna,orligentia,TainaraBorgir,paulot8,hatsukoi07,Sanne_E,Piotr_GsG,DrkZero,stonehopper1067,synthgal,"z-la,lafayet",Dushead,VilEffigy,1stance,flofloozy,ilselo,lorrabbit,acidicmoons,Krappa322,xhelock,ReptiliePangare,Warlee,xowx
xxMYDLSASTERxx,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,25,26,27
NuMetalFan69,1,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,24,25,26
ptveli,2,1,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,23,24,25
cemeteryvamp,3,2,1,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,22,23,24
Dying___Atheist,4,3,2,1,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,21,22,23
don_guraleska,5,4,3,2,1,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,20,21,22
antropogeniczna,6,5,4,3,2,1,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,19,20,21
orligentia,7,6,5,4,3,2,1,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,18,19,20
TainaraBorgir,8,7,6,5,4,3,2,1,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,17,18,19
paulot8,9,8,7,6,5,4,3,2,1,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,16,17,18


### Obtain User Listening Data

In [4]:
PERIOD = pylast.PERIOD_6MONTHS
GULD_LIM = 100
INCL_TAGS = True
TAGS_ARTISTS_LIM = 10

extract_n_items = lambda d, n: dict(list(d.items())[:n])

def get_user_list_data(
        user_list:list[pylast.User],
        period:str=pylast.PERIOD_6MONTHS,
        artists_limit:int|None=None,
        albums_limit:int|None=None,
        tracks_limit:int|None=None,
        include_tags:bool=False,
        tags_limit:int|None=None,
        tags_artists_limit:int|None=None,
    ):
    extract_name_plays = lambda top_items: {t.item.get_name() : int(t.weight) for t in top_items}
    data = {}
    for user in tqdm(user_list):
        top_artists = user.get_top_artists(period=period, limit=artists_limit)
        top_albums = user.get_top_albums(period=period, limit=albums_limit)
        top_tracks = user.get_top_tracks(period=period, limit=tracks_limit)
        user_name = user.get_name()
        data[user_name] = {}
        data[user_name]["top_artists"] = extract_name_plays(top_artists)
        data[user_name]["top_albums"] = extract_name_plays(top_albums)
        data[user_name]["top_tracks"] = extract_name_plays(top_tracks)
        if include_tags:
            num_artists = 0
            tags = {}
            for top_artist in top_artists:
                top_tags = top_artist.item.get_top_tags()
                artist_weight = int(top_artist.weight)
                for top_tag in top_tags:
                    tag = top_tag.item.get_name()
                    tag_weight = int(top_tag.weight) * artist_weight
                    if tag in tags:
                        tags[tag] += tag_weight
                    else:
                        tags[tag] = tag_weight
                if tags_artists_limit:
                    num_artists += 1
                    if num_artists >= tags_artists_limit:
                        break
            data[user_name]["top_tags"] = extract_n_items(tags, tags_limit)
    return data

def truncate_user_list_data(
        user_list_data:dict[str, dict[str, dict[str, int]]],
        artists_limit:int|None,
        albums_limit:int|None,
        tracks_limit:int|None,
        include_tags:bool=False,
        tags_limit:int|None=None,
    ):
    data = {}
    for user in user_list_data:
        user_data = user_list_data[user]
        data[user] = {
            "top_artists":extract_n_items(user_data["top_artists"], artists_limit),
            "top_albums":extract_n_items(user_data["top_albums"], albums_limit),
            "top_tracks":extract_n_items(user_data["top_tracks"], tracks_limit),
        }
        if include_tags:
            data[user]["top_tags"] = extract_n_items(user_data["top_tags"], tags_limit)
    return data


UNC_DATA = get_user_list_data(UNC, PERIOD, GULD_LIM, GULD_LIM, GULD_LIM, INCL_TAGS, GULD_LIM, TAGS_ARTISTS_LIM)

unc_data_ex = truncate_user_list_data(UNC_DATA, 2, 2, 2, INCL_TAGS, 2)
pprint(unc_data_ex, sort_dicts=False)

 55%|█████▌    | 16/29 [01:33<01:16,  5.85s/it]


WSError: User not found

### Remove Missing Data

In [None]:
def remove_missing_data(user_list_data:dict[str, dict[str, dict[str, int]]]):
    data = {user: data for user, data in user_list_data.items() if all(data.values())}
    return data

UNC_DATA_CLEAN = remove_missing_data(UNC_DATA)
unc_data_clean_ex = remove_missing_data(unc_data_ex)

kept_users = list(UNC_DATA_CLEAN.keys())
DOS_DF_CLEAN = DOS_DF.loc[kept_users, kept_users]

pprint(unc_data_clean_ex, sort_dicts=False)

dos_df_clean_styled = DOS_DF_CLEAN.style.background_gradient(
    cmap='plasma_r',
    vmin=DOS_DF_CLEAN.min().min(),
    vmax=DOS_DF_CLEAN.max().max(),
)
dos_df_clean_styled

## Save the Dataset
Serialization to a json file

In [None]:
DATASET_NAME = "MuSo_User"

with open(f"{DATASET_NAME}_Data.json", 'w') as f:
    json.dump(UNC_DATA_CLEAN, f)

DOS_DF_CLEAN.to_csv(f"{DATASET_NAME}_DOS.csv")