# Demo music query

We perform the following in order to generate recommendation.

1. Load an embeddings and cluster model.
2. Query by specifying song title and any metadata to condition.
3. Get lyrics through an API.
    - First with [this API](http://www.chartlyrics.com/api.aspx), as it's free and does not require an API key.
    - Otherwise fall back on [this API](https://github.com/johnwmillr/LyricsGenius) to access Genius. **Note you will need an API key which can create [here](https://genius.com/api-clients).**
4. Get Spotify acoustic features and metadata with [this API](https://spotipy.readthedocs.io/en/2.19.0/). **Note you will need a client ID and secret key which can create [here](https://developer.spotify.com).**
5. Return top K recommendations by:
    - Computing embedding.
    - Identifying corresponding cluster.
    - Subset based on query.
    
First some imports.

In [1]:
import urllib.request
import json
import numpy as np
import time
import lyricsgenius
import re
from tqdm import tqdm
import pandas as pd
import xml.etree.ElementTree as ET
from pprint import pprint
import os
import spotipy
import torch
from sklearn.metrics.pairwise import cosine_distances, euclidean_distances
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import calinski_harabasz_score, davies_bouldin_score, silhouette_score
from spotipy.oauth2 import SpotifyClientCredentials
from sentence_transformers import SentenceTransformer
from transformers import AutoConfig, AutoModel,AutoModelForPreTraining, AutoTokenizer
import unicodedata


def strip_accents(s):
    return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')

def get_token(token_name, token_path="tokens.json"):
    TOKEN = None
    if os.environ.get(token_name):
        TOKEN = os.environ.get(token_name)
    elif os.path.isfile(token_path):
        f = open(token_path)
        data = json.loads(f.read())
        TOKEN = data[token_name]
    else:
        assert TOKEN is not None, f"No value for {token_name}."
    return TOKEN


def standardize_lyrics(lyrics, i=0, verbose=False):
    if verbose:
        print(i)
    if lyrics is np.nan or len(lyrics) == 0:
        return np.nan
    
    # remove new lines
    clean = lyrics.replace("\\n\\n", ". ").replace("\\n", ". ").replace("\\", "")
    
    # remove square brackets around lyrics
    # if possible, extract chorus, pre-chorus, post-chorus, bridge, verses
    song_parts = ["Chorus", "Pre-Chorus", "Post-Chorus", "Bridge", "Verse 1", "Verse 2", "Verse 3", "Verse 4"]
    if verbose:
        for part in song_parts:
            text = find_between(clean, f"[{part}]. ", "[")
            if len(text):
                print(f"\n{part} : {text}")
    
    for part in song_parts:
        clean = clean.replace(f"[{part}]. ", "")
        
        
    # remove anything else in square brackets
    clean = re.sub("[\[].*?[\]]", "", clean)
    
    # clean up
    clean = clean.replace('"', "")
    try:
        while clean[0] == "." or clean[0] == " " or clean[0] == "'":
            clean = clean[1:]
    except:
        return np.nan
    try:
        if clean[-1] == "'":
            clean = clean[:-1]
    except:
        return np.nan
    
    clean = clean.strip().replace("\n", " ")
        
    return clean


def get_embeddings(model_str, lyrics, embedding_fp=None):
    
    if model_str.endswith('.npy'):
        # already computed embeedings
        print("getting precomputed embeddings")
        corpus_embeddings = np.load(model_str)
        
        
    else:
        # load model
        if os.path.isdir(model_str):
            config = AutoConfig.from_pretrained(f'{model_str}/config.json')
            model = AutoModel.from_config(config)
            model = AutoModel.from_pretrained(f'{model_str}/pytorch_model.bin',config=config)
            model.eval()
            model.cuda()
            tokenizer = AutoTokenizer.from_pretrained(model_str, use_fast=True)
        else:
            model = SentenceTransformer(model_str)

        # file to save embedding
        if embedding_fp is None:
            if os.path.isdir(model_str):
                embedding_fp = f"{os.path.split(model_str)[0]}_embeddings.pt"
            else:
                embedding_fp = f"{model_str}_embeddings.pt"
            embedding_fp = os.path.join("embeddings", embedding_fp)
            print(embedding_fp)

        # load or compute embeddings
        if os.path.exists(embedding_fp):
            print("loading already computed embeddings")
            corpus_embeddings = torch.load(embedding_fp)
            if torch.is_tensor(corpus_embeddings):
                corpus_embeddings = corpus_embeddings.cpu().data.numpy()
        else:
            start_time = time.time()
            if os.path.isdir(model_str):
                tokens = tokenizer.batch_encode_plus(
                    song_lyrics,
                    max_length = 512,
                    padding=True,
                    truncation=True
                )


                embed = []
                with torch.no_grad():
                    for i in tqdm(range(len( df_clean['lyrics']))):
                        tkin = tokens['input_ids'][i:i+1]
                        tkam = tokens['attention_mask'][i:i+1]

                        tkin = torch.tensor(tkin).cuda()
                        tkam = torch.tensor(tkam).cuda()

                        out = model(tkin,tkam)['last_hidden_state']
                        out = out.mean(1).cpu().numpy()

                        embed.append(out)
                corpus_embeddings = np.vstack(embed)

            else:
                corpus_embeddings = model.encode(song_lyrics)
                corpus_embeddings = corpus_embeddings.cpu().data.numpy()
            proc_time = time.time() - start_time
            print(f"Time for computing embeddings : {proc_time} seconds")
            print(f"{proc_time / num_sentences} seconds per song")
            torch.save(corpus_embeddings, embedding_fp)
        
    # normalize
    embedding_norms = np.linalg.norm(corpus_embeddings, axis=1, keepdims=True)
    corpus_embeddings = corpus_embeddings /  embedding_norms


    return corpus_embeddings


def compute_single_embedding(embeddings_model, lyrics):
    if embeddings_model.endswith('.npy'):
        # already computed embeedings
        corpus_embeddings = np.load(embeddings_model)
        song_id = df_query.iloc[0].name
        idx = df_clean.index.values == song_id
        query_embed = corpus_embeddings[idx][0]
    elif not os.path.isdir(embeddings_model):
        # coming from model hub
        model = SentenceTransformer(embeddings_model)
        query_embed = model.encode(lyrics)
    else:
        # local model
        config = AutoConfig.from_pretrained(f'{embeddings_model}/config.json')
        model = AutoModel.from_config(config)
        model = AutoModel.from_pretrained(f'{embeddings_model}/pytorch_model.bin',config=config)
        model.eval()
        model.cuda()
        tokenizer = AutoTokenizer.from_pretrained(embeddings_model, use_fast=True)

        # TODO : simpler for a single lyric?
        tokens = tokenizer.batch_encode_plus(
            [lyrics],
            max_length = 512,
            padding=True,
            truncation=True
        )
        embed = []
        with torch.no_grad():
            for i in tqdm(range(len([lyrics]))):
                tkin = tokens['input_ids'][i:i+1]
                tkam = tokens['attention_mask'][i:i+1]

                tkin = torch.tensor(tkin).cuda()
                tkam = torch.tensor(tkam).cuda()

                out = model(tkin,tkam)['last_hidden_state']
                out = out.mean(1).cpu().numpy()

                embed.append(out)
        query_embed = embed[0][0]

    # normalize
    query_embed = query_embed /  np.linalg.norm(query_embed)
    return query_embed


def get_cluster_assignment(model_str, corpus_embeddings, n_cluster, affinity, linkage, clustering_fp=None, print_metrics=False):
    
    # file to save cluster assignment
    if clustering_fp is None:
        if os.path.isdir(model_str):
            clustering_fp = os.path.split(model_str)[0]
        elif model_str.endswith('.npy'):
            clustering_fp = os.path.basename(model_str).split(".")[0]
        else:
            clustering_fp = model_str
        clustering_fp = clustering_fp + f"_{n_cluster}clusters_affinity={affinity}_linkage={linkage}"
        clustering_fp = os.path.join("clustering", clustering_fp)
        clustering_fp_npz = clustering_fp + ".npz"
        print(clustering_fp_npz)
        # backwards compatibility
        clustering_fp_npy = clustering_fp + ".npy"
    else:
        assert clustering_fp.endswith('.npz')
        clustering_fp_npz = clustering_fp
    
    # load or compute cluster assignment
    ch = None
    db = None
    sh = None
            
    if os.path.exists(clustering_fp_npz):
        print("loading already computed cluster assignment")
        data = np.load(clustering_fp_npz)
        cluster_assignment = data["cluster_assignment"]
        ch = float(data["ch"])
        db = float(data["db"])
        sh = float(data["sh"])
        
    elif os.path.exists(clustering_fp_npy):
        print("loading already computed cluster assignment")
        cluster_assignment = np.load(clustering_fp_npy)
        
    else:
        print("start clustering")
        start_time = time.time()
        clustering_model = AgglomerativeClustering(n_clusters=n_cluster, affinity=affinity, linkage=linkage, distance_threshold=None)
        clustering_model.fit(corpus_embeddings)
        cluster_assignment = clustering_model.labels_
        proc_time = time.time() - start_time
        print(f"clustering time : {proc_time} seconds")
        
    if ch is None:
        # compute metrics
        print("computing metrics")
        ch = calinski_harabasz_score(corpus_embeddings, cluster_assignment)
        db = davies_bouldin_score(corpus_embeddings, cluster_assignment)
        sh = silhouette_score(corpus_embeddings, cluster_assignment)
        
        # save everything
        print(clustering_fp_npz)
        np.savez(
            clustering_fp_npz, 
            cluster_assignment=cluster_assignment, 
            ch=ch, db=db, sh=sh
        )
        
    if print_metrics:
        print("\ncalinski_harabasz_score : ", ch)
        print("davies_bouldin_score : ", db)
        print("silhouette_score : ", sh)
        
    return cluster_assignment, ch, db, sh


def subset_conditions(df, embed, cond, song_meta):
    embed_sub = embed.copy()
    df_sub = df.copy()
    for _key in cond:
        if cond[_key]:
#             print(_key)
            if _key == "genre":
                ind_genre = df_sub["genre"] == cond[_key]
                embed_sub = embed_sub[ind_genre]
                df_sub = df_sub[ind_genre]
            elif _key == "mode" or _key == "explicit":
                if cond[_key] == -1:
                    _idx = df_sub[_key] == 0
                elif cond[_key] == 1:
                    _idx = df_subset[_key] == 1
                embed_sub = embed_sub[_idx]
                df_sub = df_sub[_idx]
            else: 
                if cond[_key] >= 0:
                    _ind = df_sub[_key] > song_meta[_key]
                else:
                    _ind = df_sub[_key] < song_meta[_key]
                embed_sub = embed_sub[_ind]
                df_sub = df_sub[_ind]
    return df_sub, embed_sub





# 1) specify query

In [2]:
"""
Can write a condition dictionary like below. Don't need all entries if have no preference.

conditions = {
    "genre": None,    # ['dance pop', 'acoustic/folk', 'hip-hop/rap', 'pop', 'soul/disco', 'country', 'r&b', 'rock']
    "mode": 0,   # -1 for minor, 1 for major, 0 for no preference
    "explicit": 0,   # -1 for clean, 1 for explicit, 0 for no preference
    # positive for more, 0 for no preference, negative for less
    'acousticness': 0,
    'danceability': 0,
    'energy': 0,
    'instrumentalness': 0,
    'liveness': 0,
    'loudness': 0,
    'popularity': 0,
    'release_year': 0,
    'speechiness': 0,
    'tempo': 0,
    'valence': 0
}

SimSiam models rely on v3 of the dataset, which have the following genre.
Genres for v3: ['rock', 'country', 'dance pop', 'soul', 'acoustic/folk', 'hip-hop/rap', 'r&b', 'alternative/punk', 'disco/house', 'pop', 'adult standards']

IF ENCODER IS NOT AVAILABLE YOU CANNOT QUERY OUTSIDE OF DATABASE (e.g. SimSiam).
"""

""" SPECIFY MODEL (from hub or local path) OR PRECOMPUTED EMBEDDINGS """
# from model hub or local path
embeddings_model = "all-mpnet-base-v2"
embeddings_model = "all-mpnet-base-v2-finetuned-genre_unfrozen_base-checkpoint-1735/checkpoint-1735"
# embeddings_model = "mpnet-genre-valence-finetuned/checkpoint-1041"
# embeddings_model = "mpnet-genre-valence-energy-finetuned/checkpoint-694"
# embeddings_model = "mpnet-genre-valence-energy-danceability-finetuned/checkpoint-1041"
# embeddings_model = "mpnet-genre-valence-energy-danceability-genre-finetuned/checkpoint-1041"


# embeddings_model = "embeddings/simsiam_mpnet.npy"    # embeddings nearly identical...
# embeddings_model = "embeddings/simsiam_distilroberta.npy"   # embeddings nearly identical...


# """ SPECIFY QUERY """
# # country love song
# song_title = "all i want for christmas is you"
# artist = "mariah carey"
# conditions = {
#     "genre": 'country', 
#     "valence": 0
# }

# # more upbeat, hip-hop song about perserverance
# song_title = "we are the champions"
# artist = "queen"
# conditions = {
#     "genre": 'hip-hop/rap', 
#     "energy": 1,
#     "valence": 1,
#     "danceability" :1
# }


# dance pop version of creep that is not explicit
song_title = "This Christmas"
artist = "Chris Brown"
conditions = {
#     "genre": 'country',    
#     "explicit": -1,
#     "valence": 1
}

# 2) get lyrics

- http://www.chartlyrics.com/api.aspx
- https://github.com/johnwmillr/LyricsGenius

For last approach, you need an [API token](https://genius.com/api-clients) and add it to your environment variables:
```
export GENIUS_ACCESS_TOKEN="my_access_token_here"
```

In [3]:
GENIUS_ACCESS_TOKEN = get_token("GENIUS_ACCESS_TOKEN")

In [4]:
lyrics = None
song_metadata = None
query_embed = None

# first query database
if embeddings_model == "embeddings/simsiam_mpnet.npy" or embeddings_model == "embeddings/simsiam_distilroberta.npy":
    print("using v3 dataset")
    df_clean = pd.read_pickle("df_clean_v3_13122021_py35.pkl")   # for SIAM models
    df_songs = np.array([strip_accents(a.lower()) for a in df_clean["song_name"].values])
    df_query = df_clean[df_songs == song_title.lower()]
else:
    df_clean = pd.read_pickle("df_clean_v4_14122021_py35.pkl")
    df_songs = np.array([strip_accents(a.lower()) for a in df_clean["song_name"].values])
    df_query = df_clean[df_songs == song_title.lower()]
if artist is not None:
    df_artists = np.array([strip_accents(a.lower()) for a in df_query["artist"].values])
    df_query = df_query[df_artists == artist.lower()]
if len(df_query) == 1:
    print("Found song in database!")
    lyrics = df_query["lyrics"].values[0]
    song_metadata = df_query.iloc[0].to_dict()
    
if lyrics is None and embeddings_model.endswith('.npy'):
    raise ValueError("Didn't find lyrics in database and won't be able to compute embedding without the model (as you've provided precomputed embeddings)")

if lyrics is None:
    if artist is not None:
        print("Trying api.chartlyrics.com")
        start_url = f"http://api.chartlyrics.com/apiv1.asmx/SearchLyricDirect?artist={strip_accents(artist)}&song={song_title}"
        url = start_url.replace(" ","%20")
        contents = urllib.request.urlopen(url).read()
        root = ET.fromstring(contents.decode("utf-8"))
        for child in root:
            tag = child.tag.split("}")[1]
            if tag == "Lyric":
                lyrics = child.text
            if tag == "LyricSong":
                rec_song = strip_accents(child.text.lower())
            if tag == "LyricArtist":
                rec_artist = strip_accents(child.text.lower())
        if rec_song != strip_accents(song_title.lower()) or rec_artist != strip_accents(artist.lower()):
            lyrics = None
        if lyrics is not None:
            lyrics = lyrics.strip().replace("\n", " ")

    if lyrics is None and GENIUS_ACCESS_TOKEN:
        # use Genius API
        print("Using Genius...")
        genius = lyricsgenius.Genius(GENIUS_ACCESS_TOKEN)
        song = genius.search_song(song_title, artist)
        lyrics = standardize_lyrics(song.lyrics)
        lyrics = ' '.join(lyrics.split(' ')[:-1])[:-13]   # remove last part Genius adds
        print(f"Received song : {song.full_title}")
    #     if song.full_title.lower() != song_title.lower() or song.artist.lower() != artist.lower():
    #         lyrics = None
    
    
if lyrics is None:
    raise ValueError("Could not find song.")
    
print("\nLYRICS :", lyrics)

Found song in database!

LYRICS : Hang all the mistletoe. I'm gonna get to know you better. This Christmas. And as we trim the tree. How much fun it's gonna be together. This Christmas. The fireside is blazing bright, woah-oh-oh. We're caroling through the night, woah-oh-oh. This Christmas will be. A very special Christmas for me. Woah-oh-oh-woah-oh-oh-oh. Ha, let's go (Woo!). Presents and cards are here. My world is filled with cheer and you. This Christmas. And as I look around. Your eyes outshine the town, they do. This Christmas. The fireside is blazing bright. And we're caroling through the night. And this Christmas will be. A very special Christmas for me, yeah, huh, woah-oh-oh. Oh, haha, shake a hand, shake a hand now. Na-na-na-na-na-na. . Ooh, the fireside is blazing bright. And we're caroling through the night. And this Christmas will be. So special for you and me, yes, it will be. Woah-oh-oh, ha. Shake a hand now. Come on, everybody shake a hand now, ha. Family, hey. We'll be

# 3) get spotipy metadata and features

be sure to have credentials from [here](https://developer.spotify.com) and save them as environment variables.
```
export SPOTIPY_CLIENT_ID='your-spotify-client-id'
export SPOTIPY_CLIENT_SECRET='your-spotify-client-secret'
```

In [5]:
if song_metadata is None:
    print("Getting song metadata from Spotify...\n")
    SPOTIPY_CLIENT_ID = get_token("SPOTIPY_CLIENT_ID")
    SPOTIPY_CLIENT_SECRET = get_token("SPOTIPY_CLIENT_SECRET")

    auth_manager = SpotifyClientCredentials(client_id=SPOTIPY_CLIENT_ID, client_secret=SPOTIPY_CLIENT_SECRET)
    sp = spotipy.Spotify(auth_manager=auth_manager)

    # search for song, https://developer.spotify.com/documentation/web-api/reference/#/operations/search
    query = f"track:{song_title}"
    if artist is not None:
        query += f" artist:{artist}"
    res = sp.search(q=query, type='track')

    # take top entry
    _id = 0
    rx_song = res["tracks"]["items"][0]["name"]
    rx_artists = [artist["name"] for artist in res["tracks"]["items"][0]["artists"]]
    print(f"{rx_song} by {rx_artists}\n")

    song_metadata = dict()
    song_metadata["release_year"] = int(res["tracks"]["items"][_id]["album"]["release_date"][:4])
    song_metadata["popularity"] = res["tracks"]["items"][_id]["popularity"]
    song_metadata["explicit"] = res["tracks"]["items"][_id]["explicit"]

    # get acoustic features
    acoustic_features = ["mode", "acousticness", "danceability", "energy", "instrumentalness", "liveness", "loudness", "speechiness", "valence", "tempo"]
    uri = res["tracks"]["items"][_id]["uri"]
    feat_results = sp.audio_features(uri)[0]
    for _feat in acoustic_features:
        song_metadata[_feat] = feat_results[_feat]

else:
    print("Got song metadata from database!\n")
        
pprint(song_metadata)

Got song metadata from database!

{'acousticness': 0.384,
 'artist': 'Chris Brown',
 'danceability': 0.40700000000000003,
 'energy': 0.647,
 'explicit': 0,
 'genre': 'dance pop',
 'instrumentalness': 0.0,
 'liveness': 0.0862,
 'loudness': -6.031000000000001,
 'lyrics': "Hang all the mistletoe. I'm gonna get to know you better. This "
           "Christmas. And as we trim the tree. How much fun it's gonna be "
           'together. This Christmas. The fireside is blazing bright, '
           "woah-oh-oh. We're caroling through the night, woah-oh-oh. This "
           'Christmas will be. A very special Christmas for me. '
           "Woah-oh-oh-woah-oh-oh-oh. Ha, let's go (Woo!). Presents and cards "
           'are here. My world is filled with cheer and you. This Christmas. '
           'And as I look around. Your eyes outshine the town, they do. This '
           "Christmas. The fireside is blazing bright. And we're caroling "
           'through the night. And this Christmas will be.

# 4) return top K recommendations

specify clustering param

In [6]:
n_clusters = 5
affinity = "cosine"     # “euclidean”, “l1”, “l2”, “manhattan”, “cosine”, or “precomputed”. If linkage is “ward”, only “euclidean” is accepted
linkage = "complete"    # {‘ward’, ‘complete’, ‘average’, ‘single’}, default=’ward’
# affinity = "euclidean"
# linkage = "ward"

first compute embedding

In [7]:
if embeddings_model.endswith('.npy'):
    # already computed embeedings
    corpus_embeddings = np.load(embeddings_model)
    song_id = df_query.iloc[0].name
    idx = df_clean.index.values == song_id
    query_embed = corpus_embeddings[idx][0]
elif not os.path.isdir(embeddings_model):
    # coming from model hub
    model = SentenceTransformer(embeddings_model)
    query_embed = model.encode(lyrics)
else:
    # local model
    config = AutoConfig.from_pretrained(f'{embeddings_model}/config.json')
    model = AutoModel.from_config(config)
    model = AutoModel.from_pretrained(f'{embeddings_model}/pytorch_model.bin',config=config)
    model.eval()
    model.cuda()
    tokenizer = AutoTokenizer.from_pretrained(embeddings_model, use_fast=True)
    
    # TODO : simpler for a single lyric?
    tokens = tokenizer.batch_encode_plus(
        [lyrics],
        max_length = 512,
        padding=True,
        truncation=True
    )
    embed = []
    with torch.no_grad():
        for i in tqdm(range(len([lyrics]))):
            tkin = tokens['input_ids'][i:i+1]
            tkam = tokens['attention_mask'][i:i+1]

            tkin = torch.tensor(tkin).cuda()
            tkam = torch.tensor(tkam).cuda()

            out = model(tkin,tkam)['last_hidden_state']
            out = out.mean(1).cpu().numpy()

            embed.append(out)
    query_embed = embed[0][0]

# normalize
query_embed = query_embed /  np.linalg.norm(query_embed)

print(query_embed.shape)

Some weights of the model checkpoint at all-mpnet-base-v2-finetuned-genre_unfrozen_base-checkpoint-1735/checkpoint-1735/pytorch_model.bin were not used when initializing MPNetModel: ['classifier.dense.bias', 'classifier.out_proj.bias', 'classifier.dense.weight', 'classifier.out_proj.weight']
- This IS expected if you are initializing MPNetModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing MPNetModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of MPNetModel were not initialized from the model checkpoint at all-mpnet-base-v2-finetuned-genre_unfrozen_base-checkpoint-1735/checkpoint-1735/pytorch_model.bin and are newly initialized: ['mpnet.pooler.dense.bias', 'mpnet.pooler.dense.we

(768,)





identify corresponding cluster

In [8]:
if os.path.isdir(embeddings_model):
    clustering_fp = os.path.split(embeddings_model)[0]
elif embeddings_model.endswith('.npy'):
    clustering_fp = os.path.basename(embeddings_model).split(".")[0]
else:
    clustering_fp = embeddings_model
    
# TODO : could be npz file with scores...
clustering_fp += f"_{n_clusters}clusters_affinity={affinity}_linkage={linkage}.npz"
clustering_fp = os.path.join("clustering", clustering_fp)
print(clustering_fp)

# check if clustering already exists
song_lyrics = df_clean['lyrics'].tolist()
if os.path.isfile(clustering_fp):
    data = np.load(clustering_fp)
    cluster_assignment = data["cluster_assignment"]
    ch = float(data["ch"])
    db = float(data["db"])
    sh = float(data["sh"])
    
else:
    # compute with clustering notebook
    print("Cluster assignment not available. Computing...")
    embeddings = get_embeddings(embeddings_model, song_lyrics)
    cluster_assignment, ch, db, sh = get_cluster_assignment(
        embeddings_model, embeddings, n_clusters, affinity, linkage
    )
    
print("\ncalinski_harabasz_score : ", ch)
print("davies_bouldin_score : ", db)
print("silhouette_score : ", sh)
    
    
# compute centroids
# -- load embeddings
# corpus_embeddings = get_embeddings(embeddings_model, song_lyrics)

if embeddings_model.endswith('.npy'):
    # already computed embeedings
    corpus_embeddings = np.load(embeddings_model)
else:
        
    if os.path.isdir(embeddings_model):
        embedding_fp = f"{os.path.split(embeddings_model)[0]}_embeddings.pt"
    else:
        embedding_fp = f"{embeddings_model}_embeddings.pt"
    embedding_fp = os.path.join("embeddings", embedding_fp)
    assert os.path.isfile(embedding_fp)
    corpus_embeddings = torch.load(embedding_fp)
    if torch.is_tensor(corpus_embeddings):
        corpus_embeddings = corpus_embeddings.cpu().data.numpy()

assert len(corpus_embeddings) == len(cluster_assignment)

# -- average according to cluster assignment
centroids = []
for i in range(n_clusters):
    inds = cluster_assignment == i
    centroids.append(np.mean(corpus_embeddings[inds,:], axis=0))
centroids = np.vstack(centroids)

# identify closest cluster according to correct metric
query_embed = query_embed / np.linalg.norm(query_embed, axis=-1, keepdims=True)

if affinity == "cosine":
    scores = cosine_distances(query_embed[np.newaxis, :], centroids)
elif affinity == "euclidean":
    scores = euclidean_distances(query_embed[np.newaxis, :], centroids)
else:
    raise ValueError
assigned_cluster = np.argmin(scores)
print(scores)
print("assigned cluster :", assigned_cluster)

clustering/all-mpnet-base-v2-finetuned-genre_unfrozen_base-checkpoint-1735_5clusters_affinity=cosine_linkage=complete.npz

calinski_harabasz_score :  574.5232967124043
davies_bouldin_score :  4.680900109760556
silhouette_score :  0.03694266825914383
[[0.775772   0.715116   0.49308443 0.9924776  0.7309867 ]]
assigned cluster : 2


subset based on query and give top K recommendations

In [9]:
# subset according to cluster
inds = cluster_assignment == assigned_cluster
embeddings_subset = corpus_embeddings[inds]
df_subset = df_clean.iloc[inds]
print("Songs in cluster :", len(embeddings_subset))

# subset according to conditions
df_subset, embeddings_subset = subset_conditions(df_subset, embeddings_subset, conditions, song_metadata)

print("Songs after conditioning :", len(embeddings_subset))

Songs in cluster : 1453
Songs after conditioning : 1453


In [10]:
# compute scores
if affinity == "cosine":
    scores = cosine_distances(query_embed[np.newaxis, :], embeddings_subset)[0]
elif affinity == "euclidean":
    scores = euclidean_distances(query_embed[np.newaxis, :], embeddings_subset)[0]

K = 3
max_len = 300  # for printing lyrics

topk = scores.argsort()[:K]
for i in topk:
    print("Score:", scores[i])
    print('Genre:',df_subset['genre'][i],'Artist:',df_subset['artist'][i],'SongName:',df_subset['song_name'][i])
    print("Lyrics:", df_subset['lyrics'][i][:max_len])
    print('*****')

Score: 0.42766857
Genre: soul/disco Artist: Larry Graham SongName: When We Get Married
Lyrics: Darling, oh.... When we get married. We'll have a big celebration. And send invitations. To all our friends and relations. And we'll have a ball. Dancin' and all. When we get married. When the bells ring. To tell the world I'm taking your hand. Folks from all over. Will come to see the wedding we pl
*****
Score: 0.44080472
Genre: pop Artist: Barry Manilow SongName: It's a Miracle
Lyrics: You wouldn't believe where I've been. The cities and towns I've been in. From Boston to Denver. And every town in between. (Everyone looks the same). The people they all look the same. (Yes, the same). Oh, only the names have been changed. (Just the names). But now that I'm home again. I'll tell you
*****
Score: 0.4504236
Genre: soul/disco Artist: Natalie Cole SongName: Party Lights
Lyrics: I see the party lights. Shining in the night. Make me feel all right. I see the party lights. Shining in the night. Real

recommendations without clustering

In [11]:
# subset according to conditions
print("Before condition subsetting : ", len(corpus_embeddings))
df_subset, embeddings_subset_no_clust = subset_conditions(df_clean, corpus_embeddings, conditions, song_metadata)
print("After condition subsetting : ", len(embeddings_subset_no_clust))
print()

# compute scores
if affinity == "cosine":
    scores_no_clust = cosine_distances(query_embed[np.newaxis, :], embeddings_subset_no_clust)[0]
elif affinity == "euclidean":
    scores_no_clust = euclidean_distances(query_embed[np.newaxis, :], embeddings_subset_no_clust)[0]

topk = scores_no_clust.argsort()[:K]
for i in topk:
    print("Score:", scores_no_clust[i])
    print('Genre:',df_subset['genre'][i],'Artist:',df_subset['artist'][i],'SongName:',df_subset['song_name'][i])
    print("Lyrics:", df_subset['lyrics'][i][:max_len])
    print('*****')

Before condition subsetting :  15863
After condition subsetting :  15863

Score: 0.07271087
Genre: dance pop Artist: Chris Brown SongName: This Christmas
Lyrics: Hang all the mistletoe. I'm gonna get to know you better. This Christmas. And as we trim the tree. How much fun it's gonna be together. This Christmas. The fireside is blazing bright, woah-oh-oh. We're caroling through the night, woah-oh-oh. This Christmas will be. A very special Christmas for me. W
*****
Score: 0.42766857
Genre: soul/disco Artist: Larry Graham SongName: When We Get Married
Lyrics: Darling, oh.... When we get married. We'll have a big celebration. And send invitations. To all our friends and relations. And we'll have a ball. Dancin' and all. When we get married. When the bells ring. To tell the world I'm taking your hand. Folks from all over. Will come to see the wedding we pl
*****
Score: 0.44080472
Genre: pop Artist: Barry Manilow SongName: It's a Miracle
Lyrics: You wouldn't believe where I've been. The cit

recommendation for base model

In [12]:
base_embeddings = get_embeddings("all-mpnet-base-v2", song_lyrics)
query_emd_base = compute_single_embedding("all-mpnet-base-v2", lyrics)


# subset according to conditions
print("Before condition subsetting : ", len(base_embeddings))
df_subset, base_embeddings_subset = subset_conditions(df_clean, base_embeddings, conditions, song_metadata)
print("After condition subsetting : ", len(base_embeddings_subset))
print()


# compute scores
if affinity == "cosine":
    scores_base = cosine_distances(query_emd_base[np.newaxis, :], base_embeddings_subset)[0]
elif affinity == "euclidean":
    scores_base = euclidean_distances(query_emd_base[np.newaxis, :], base_embeddings_subset)[0]

print()
topk = scores_base.argsort()[:K]
for i in topk:
    print("Score:", scores_base[i])
    print('Genre:',df_subset['genre'][i],'Artist:',df_subset['artist'][i],'SongName:',df_subset['song_name'][i])
    print("Lyrics:", df_subset['lyrics'][i][:max_len])
    print('*****')

embeddings/all-mpnet-base-v2_embeddings.pt
loading already computed embeddings
Before condition subsetting :  15863
After condition subsetting :  15863


Score: 0.0
Genre: dance pop Artist: Chris Brown SongName: This Christmas
Lyrics: Hang all the mistletoe. I'm gonna get to know you better. This Christmas. And as we trim the tree. How much fun it's gonna be together. This Christmas. The fireside is blazing bright, woah-oh-oh. We're caroling through the night, woah-oh-oh. This Christmas will be. A very special Christmas for me. W
*****
Score: 0.23767686
Genre: pop Artist: Andy Williams SongName: It's the Most Wonderful Time of the Year
Lyrics: It's the most wonderful time of the year. With the kids jingle belling. And everyone telling you be of good cheer. It's the most wonderful time of the year. It's the hap-happiest season of all. With those holiday greetings. And gay happy meetings when friends come to call. It's the hap-happiest seas
*****
Score: 0.24062991
Genre: pop Artist: Just