# MuSo Test
MuSo is a supervised machine learning model for predicting user compatibility (degrees of separation) based on their music preferences. This document handles loading the model and running inference tasks.

## Initialization

In [1]:
import json
import random
import pylast
import numpy as np
from numpy.typing import NDArray
from pathlib import Path
from tqdm import tqdm

def load_json_from_file():
    primary_path = Path("Credentials.json")
    fallback_path = Path("LastFM_Credentials.json")
    if primary_path.exists():
        file_path = primary_path
    else:
        file_path = fallback_path
    with file_path.open('r') as file:
        data = json.load(file)
    if not all(list(data.values())):
        raise ValueError(f"The file '{file_path}' MUST have all values defined.")
    return data

credentials = load_json_from_file()

try:
    lastfm = pylast.LastFMNetwork(
        api_key=credentials["LASTFM_API_KEY"],
        api_secret=credentials["LASTFM_API_SECRET"],
        username=credentials["LASTFM_USERNAME"],
        password_hash=pylast.md5(credentials["LASTFM_PASSWORD"]),
    )
    print("Connected to LastFM successfully.")
except Exception as e:
    print(f"Failed to connect to LastFM: {str(e)}")


with open("MuSo_User_Data.json", "r") as f:
    USER_DATA:dict = json.load(f)
    
if "top_tags" in list(USER_DATA.values())[0]:
    INCL_TAGS = True

Connected to LastFM successfully.


In [2]:
PERIOD = pylast.PERIOD_6MONTHS
GULD_LIM = 100
TAGS_ARTISTS_LIM = 10

extract_n_items = lambda d, n: dict(list(d.items())[:n])

def get_user_list_data(
        user_list:list[str],
        period:str=pylast.PERIOD_6MONTHS,
        artists_limit:int|None=None,
        albums_limit:int|None=None,
        tracks_limit:int|None=None,
        include_tags:bool=False,
        tags_limit:int|None=None,
        tags_artists_limit:int|None=None,
        cache = True,
    ):
    cache_path = Path("User_Data_Cache.json")
    if cache:
        if cache_path.exists():
            with cache_path.open("r") as f:
                cached_udata = json.load(f)
        else:
            dataset_path = Path("MuSo_User_Data.json")
            if dataset_path.exists():
                with dataset_path.open("r") as f:
                    cached_udata = json.load(f)
            else:
                cached_udata = {}
        data = {u:udata for u, udata in cached_udata.items() if u in user_list}
        user_list = [u for u in user_list if u not in cached_udata]
    else:
        if cache_path.exists():
            cache_path.unlink()
        data = {}

    extract_name_plays = lambda top_items: {t.item.get_name() : int(t.weight) for t in top_items}
    for user in tqdm(user_list):
        user = lastfm.get_user(user)
        top_artists = user.get_top_artists(period=period, limit=artists_limit)
        top_albums = user.get_top_albums(period=period, limit=albums_limit)
        top_tracks = user.get_top_tracks(period=period, limit=tracks_limit)
        user_name = user.get_name()
        data[user_name] = {}
        data[user_name]["top_artists"] = extract_name_plays(top_artists)
        data[user_name]["top_albums"] = extract_name_plays(top_albums)
        data[user_name]["top_tracks"] = extract_name_plays(top_tracks)
        if include_tags:
            num_artists = 0
            tags = {}
            for top_artist in top_artists:
                top_tags = top_artist.item.get_top_tags()
                artist_weight = int(top_artist.weight)
                for top_tag in top_tags:
                    tag = top_tag.item.get_name()
                    tag_weight = int(top_tag.weight) * artist_weight
                    if tag in tags:
                        tags[tag] += tag_weight
                    else:
                        tags[tag] = tag_weight
                if tags_artists_limit:
                    num_artists += 1
                    if num_artists >= tags_artists_limit:
                        break
            data[user_name]["top_tags"] = extract_n_items(tags, tags_limit) 
    if cache:
        cached_udata.update(remove_missing_data(data))
        with cache_path.open("w") as f:
            json.dump(cached_udata, f)
    return data

def remove_missing_data(user_list_data:dict[str, dict[str, dict[str, int]]]):
    data = {user: data for user, data in user_list_data.items() if all(data.values())}
    return data


In [3]:
def replace_with_user_plays(all_item_list:list, udata:dict, user:str, top_item:str):
    user_value_list = []
    for item in all_item_list:
        if item in udata[user][top_item]:
            user_value_list.append(udata[user][top_item][item])
        else:
            user_value_list.append(0)
    return np.array(user_value_list)

In [4]:
def get_unique_top_items(data:dict, top_item:str, shuffled:bool=False)->list[str]:
    all_items = []
    for user in data:
        all_items += list(data[user][top_item].keys())
        
    all_unique_items = list(set(all_items))
    if shuffled:
        random.shuffle(all_unique_items)
    else:
        all_unique_items.sort()
    return all_unique_items

def get_all_unique_top_items(data:dict, include_tags:bool=False):
    do_shuffle = False
    all_artists = get_unique_top_items(data, "top_artists", shuffled=do_shuffle)
    all_albums = get_unique_top_items(data, "top_albums", shuffled=do_shuffle)
    all_tracks = get_unique_top_items(data, "top_tracks", shuffled=do_shuffle)
    if include_tags:
        all_tags = get_unique_top_items(data, "top_tags", shuffled=do_shuffle)
        random.shuffle(all_tags)
    else:
        all_tags = None
    return all_artists, all_albums, all_tracks, all_tags

def normalize(a:NDArray[np.int64]):
    return (a - a.mean()) / a.std()

def get_user_vector(user1:str, user2:str):
    all_artists, all_albums, all_tracks, all_tags = get_all_unique_top_items(USER_DATA, INCL_TAGS)
    ulist = [user1, user2]
    udata = get_user_list_data(ulist, PERIOD, GULD_LIM, GULD_LIM, GULD_LIM, INCL_TAGS, GULD_LIM, TAGS_ARTISTS_LIM)
    udata = remove_missing_data(udata)
    assert user1 in udata and user2 in udata
    u1_all_artists_vec = normalize(replace_with_user_plays(all_artists, udata, user1, "top_artists"))
    u1_all_albums_vec = normalize(replace_with_user_plays(all_albums, udata, user1, "top_albums"))
    u1_all_tracks_vec = normalize(replace_with_user_plays(all_tracks, udata, user1, "top_tracks"))
    u2_all_artists_vec = normalize(replace_with_user_plays(all_artists, udata, user2, "top_artists"))
    u2_all_albums_vec = normalize(replace_with_user_plays(all_albums, udata, user2, "top_albums"))
    u2_all_tracks_vec = normalize(replace_with_user_plays(all_tracks, udata, user2, "top_tracks"))
    artist_vec = normalize(np.hstack([u1_all_artists_vec, u2_all_artists_vec]))
    album_vec = normalize(np.hstack([u1_all_albums_vec, u2_all_albums_vec]))
    track_vec = normalize(np.hstack([u1_all_tracks_vec, u2_all_tracks_vec]))
    vec_list = [artist_vec, album_vec, track_vec]
    if INCL_TAGS:
        assert all_tags
        u1_all_tags_vec = normalize(replace_with_user_plays(all_tags, udata, user1, "top_tags"))
        u2_all_tags_vec = normalize(replace_with_user_plays(all_tags, udata, user2, "top_tags"))
        tag_vec = normalize(np.hstack([u1_all_tags_vec, u2_all_tags_vec]))
        vec_list.append(tag_vec)
    #random.shuffle(vec_list)
    return normalize(np.hstack(vec_list)).astype(np.float32)

In [5]:
import torch.nn as nn
import torch.nn.functional as F

class SeparationModel(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(SeparationModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 8192)
        self.fc2 = nn.Linear(8192, 4096)
        self.fc3 = nn.Linear(4096, 2048)
        self.fc4 = nn.Linear(2048, 1024)
        self.fc5 = nn.Linear(1024, 512)
        self.fc6 = nn.Linear(512, 256)
        self.fc7 = nn.Linear(256, 128)
        self.fc8 = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(p=0.5)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = self.dropout(x)
        x = F.relu(self.fc4(x))
        x = self.dropout(x)
        x = F.relu(self.fc5(x))
        x = self.dropout(x)
        x = F.relu(self.fc6(x))
        x = self.dropout(x)
        x = F.relu(self.fc7(x))
        x = self.dropout(x)
        x = self.fc8(x)
        return x

In [6]:
import torch

MODEL_PATH = "Muso.pth"
DEVICE = "cuda"

device = torch.device(DEVICE)

def load_model(model_path):
    model = torch.load(model_path)
    model.to(device)
    model.eval()
    return model

def perform_inference(model, input_tensor):
    with torch.no_grad():
        logits = model(input_tensor)
        probabilities = torch.nn.functional.softmax(logits, dim=1)
    return probabilities, logits

MODEL = load_model(MODEL_PATH)

In [7]:
def predict_dos(user1, user2):
    user_vec = get_user_vector(user1, user2)
    print(user_vec.sum())
    assert len(user_vec) == 18432
    user_tensor = torch.from_numpy(user_vec).unsqueeze(0).to(device)
    probabilities, logits = perform_inference(MODEL, user_tensor)
    _, predicted_class = torch.max(logits, dim=1)
    print("Predicted Degrees of Separation:", predicted_class.item() + 1)
    print("Class probabilities:", probabilities)

predict_dos("fshnoeyes", "larrywalker27")

0it [00:00, ?it/s]

7.6293945e-06
Predicted Degrees of Separation: 1
Class probabilities: tensor([[9.0069e-01, 3.2363e-02, 6.2915e-02, 2.3379e-03, 1.4492e-04, 5.5647e-05,
         6.1722e-06, 1.4608e-03, 2.6969e-06, 7.3415e-06, 5.5930e-06, 5.7562e-06,
         2.6982e-07, 3.9838e-07, 2.3592e-08, 4.4732e-08, 2.3735e-08, 5.9361e-07,
         8.9828e-09, 1.1709e-08, 8.1041e-09, 6.0013e-09, 2.6355e-09, 4.1665e-09,
         3.4785e-09, 6.9237e-10, 9.9061e-09, 7.3812e-09, 8.1792e-09, 1.3015e-08,
         2.9252e-09, 2.5181e-08, 1.6111e-08, 1.9521e-07, 6.0286e-08, 5.1797e-08,
         1.0877e-08, 1.1069e-08, 6.3487e-10, 3.2721e-09, 4.8429e-11]],
       device='cuda:0')



