# Musical Semantic Embeddings (MuSE)
# CS 229 - Final Project
# Eric Lee and Akshar Sarvesh

In [None]:
"""
Import statements and initializations for access to database.
"""

import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
import csv
import ast
import os
import requests
from dotenv import load_dotenv
import mysql.connector
import ssl
import pymysql
import random
from scipy.sparse import lil_matrix, csr_matrix

# Load .env file to access data in MySQL database
load_dotenv()

# Set delimiter for storing and parsing lists in MySQL database
DELIMITER = "<BRK>"

counter = -1

# Fix seed for debugging
random.seed(17)


In [None]:
"""
Load songs from .csv files into MySQL database hosted in DigitalOcean.
"""

# Adding this flag so we don't update every time this cell is run. 
update_songs_flag = False

conn = pymysql.connect(
    user=os.getenv('DB_USERNAME'),
    password=os.getenv('DB_PASSWORD'),
    host=os.getenv('DB_HOST'),
    port=int(os.getenv('DB_PORT')),
    database=os.getenv('DB_NAME'),
    ssl={'ca': './ca-certificate.crt'}
)

cursor = conn.cursor()

# If flag is active, then parse tracks from .csv and add into database.
if update_songs_flag == True:
    with open('final_tracks.csv', mode='r') as file:
        csv_reader = csv.reader(file)

        for row in csv_reader:
            if counter >= 0 and counter > last_committed:
                index = counter
                name = row[1]
                artists = ast.literal_eval(row[2])
                artists_str = DELIMITER.join(artists)
                song_id = row[3]
                popularity = row[4]
                artist_ids = ast.literal_eval(row[8])
                artist_ids_str = DELIMITER.join(artist_ids)
                playlist_ids = ast.literal_eval(row[9])
                num_playlists = len(playlist_ids)
                playlist_ids_str = DELIMITER.join(playlist_ids)
                print(str(index) + ":", name, "-", artists_str)
                
                # Insert into the database and commit the change. 
                query = """
                    INSERT INTO CS_229_SONGS_ALL (SONG_NUM, NAME, ARTISTS, SONG_ID, POPULARITY, ARTIST_ID, PLAYLIST_IDS, NUM_PLAYLISTS)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                """
                cursor.execute(query, (index, name, artists_str, song_id,
                               popularity, artist_ids_str, playlist_ids_str, num_playlists))
                conn.commit()

            last_committed = counter
            counter += 1

conn.close()


In [None]:
"""
Query the Songs table in database to sample the playlists to use.
"""

playlist_set = set()
songs_set = set()
num_songs = 10
song_to_name = {}

try:
    with conn.cursor() as cursor:
        # SQL query to find the top N songs based on NUM_PLAYLISTS
        query = f"""
            SELECT SONG_NUM, NAME, NUM_PLAYLISTS, PLAYLIST_IDS, ARTISTS
            FROM CS_229_SONGS_ALL
            ORDER BY NUM_PLAYLISTS DESC
            LIMIT {num_songs}
        """
        cursor.execute(query)

        # Fetch all results
        top_songs = cursor.fetchall()

        # Print results
        for song in top_songs:
            playlist_ids_list = song[3].split(DELIMITER)
            playlist_set.update(set(playlist_ids_list))
            songs_set.add(song[0])
            print(
                f"Song Number: {song[0]}, Name: {song[1]}, Number of Playlists: {song[2]}")

finally:
    # Close the connection
    conn.close()


In [None]:
"""
Iterate through playlists to gather the set of all unique songs within
those playlists, from the database.
"""

# This dict maps playlist to the songs in those playlists.
playlist_songs_dict = {}

# Processing songs
for playlist in playlist_set:
    playlist_songs_dict[playlist] = []

try:
    with conn.cursor() as cursor:
        query = """
            SELECT SONG_NUM, PLAYLIST_IDS, NAME, ARTISTS
            FROM CS_229_SONGS_ALL
            ORDER BY NUM_PLAYLISTS DESC
        """
        cursor.execute(query)
        all_songs = cursor.fetchall()

        for song in all_songs:
            add_song_flag = False
            for playlist in song[1].split(DELIMITER):
                if playlist in playlist_set:
                    playlist_songs_dict[playlist].append(song[0])
                    add_song_flag = True

            if add_song_flag:
                songs_set.add(song[0])

finally:
    # Close the connection
    conn.close()


In [None]:
"""
Split data into training set, validation set, and tests set, as well as 
generating the examples necessary from the validation/test set for 
quantitative evaluation. 
"""

# Create train, val, and test sets
train_playlists = []
train_playlist_set = set()
train_playlist_songs_dict = {}
validation_playlists = []
test_playlists = []

# Randomly sort 10% of playlists into validation, 10% of playlists into test
count = 0
for playlist in sorted(playlist_set):
    if count % 9 == 0:
        test_playlists.append(playlist_songs_dict[playlist])
    elif count % 10 == 0:
        validation_playlists.append(playlist_songs_dict[playlist])
    else:
        train_playlists.append(playlist_songs_dict[playlist])
        train_playlist_set.add(playlist)
        train_playlist_songs_dict[playlist] = playlist_songs_dict[playlist]
    count += 1

# Use random sampling above to create the training set
first_time_flag = True
for playlist in train_playlists:
    if first_time_flag:
        train_songs = set(playlist)
        first_time_flag = False
    else:
        train_songs = train_songs.union(set(playlist))

# Generate validation examples for hyperparameter tuning evaluation
validation_examples = []
while len(validation_examples) <= 100:
    for playlist in validation_playlists:
        i = random.randint(0, len(playlist) - 2)
        if playlist[i] in train_songs and playlist[i + 1] in train_songs:
            validation_examples.append(tuple((playlist[i], playlist[i+1], 1)))
while len(validation_examples) <= 200:
    train_songs_list = list(train_songs)
    song_1 = train_songs_list[random.randint(0, len(train_songs_list) - 1)]
    song_2 = train_songs_list[random.randint(0, len(train_songs_list) - 1)]
    if song_1 != song_2:
        pair_good = True
        for playlist in validation_playlists:
            if song_1 in playlist and song_2 in playlist:
                pair_good = False
        if pair_good:
            validation_examples.append(tuple((song_1, song_2, 0)))

# Generate test examples for final evaluation
test_examples = []
while len(test_examples) <= 100:
    for playlist in test_playlists:
        i = random.randint(0, len(playlist) - 2)
        if playlist[i] in train_songs and playlist[i + 1] in train_songs:
            test_examples.append(tuple((playlist[i], playlist[i+1], 1)))
while len(test_examples) <= 200:
    train_songs_list = list(train_songs)
    song_1 = train_songs_list[random.randint(0, len(train_songs_list) - 1)]
    song_2 = train_songs_list[random.randint(0, len(train_songs_list) - 1)]
    if song_1 != song_2:
        pair_good = True
        for playlist in test_playlists:
            if song_1 in playlist and song_2 in playlist:
                pair_good = False
    if pair_good:
        test_examples.append(tuple((song_1, song_2, 0)))

songs_set = train_songs


In [None]:
"""
Creating the co-occurrence matrix M, populating from the playlist-song membership
data we have.
"""

# Note that our vocab is the songs_set, so establish dict to map song names to indexes
song_to_index = {song: index for index, song in enumerate(list(songs_set))}

# Create co-occurrence matrix as a sparse matrix 
cooccurrence_matrix = lil_matrix((len(songs_set), len(songs_set)), dtype=int)

playlist_counter = 0
for playlist in train_playlist_set:
    # Indexes as row, col in matrix to update, mapped from the songs associated with the playlist
    indexes = [song_to_index[song]
               for song in train_playlist_songs_dict[playlist]]
    for i in range(len(indexes)):
        for j in range(i + 1, len(indexes)):
            cooccurrence_matrix[indexes[i], indexes[j]] += 1
            cooccurrence_matrix[indexes[j], indexes[i]] += 1
    playlist_counter += 1
    if playlist_counter % 100 == 0:
        print("Finished processing playlist #" + str(playlist_counter))


In [None]:
"""
Sanity checks on the co-occurrence matrix to ensure it is of proper form.
"""

# Check shape of co-occurrence matrix. 
cooccurrence_matrix = cooccurrence_matrix.tocsr()
print("Co-occurrence matrix shape:", cooccurrence_matrix.shape)
print("Number of non-zero entries:", cooccurrence_matrix.nnz)
print("Number of zero entries:", str(108338 ** 2 - cooccurrence_matrix.nnz))

# Ensure the values within the matrix make sense.
row_sums = cooccurrence_matrix.sum(axis=1)
column_sums = cooccurrence_matrix.sum(axis=0)
row_sums = np.array(row_sums).flatten()
column_sums = np.array(column_sums).flatten()
print("Row sums (first 10):", row_sums[:10])
print("Column sums (first 10):", column_sums[:10])


In [1]:
"""
Initialize the embeddings before training.
"""
def init_embeddings(num_songs, dim=25):
    embeddings = np.random.randn(num_songs, dim) * 0.01
    bias = np.zeros(num_songs)
    return embeddings, bias


"""
Establish weighing function with scaling by alpha for embedding generation. 
"""
def weighting_func(x, x_max, alpha=0.75):
    return np.where(x < x_max, (x / x_max) ** alpha, 1)


"""
Establish training loop with batch gradient descent working on co-occurrence matrix. 
"""
def basic_train(cooccurrence_matrix, vocab_size, emb_dim, lr=0.05, epochs=150, x_max=1000, alpha=0.75):
    
    # Initialize embeddings and bias
    embeddings, bias = init_embeddings(vocab_size, emb_dim)

    # Extract non-zero co-occurrence pairs (i, j, x_ij)
    # Convert sparse matrix to COO format (for easy iteration)
    coo_matrix = cooccurrence_matrix.tocoo()
    non_zero_indices = list(
        zip(coo_matrix.row, coo_matrix.col, coo_matrix.data))

    # Precompute weights for the non-zero elements
    cooccurrence_values = np.array([x[2] for x in non_zero_indices])
    weights = weighting_func(cooccurrence_values, x_max, alpha)
    total_loss_history = []

    # Training loop
    for epoch in range(epochs):
        total_loss = 0

        for idx, (i, j, x_ij) in enumerate(non_zero_indices):
            # Compute dot product + bias
            prediction = np.dot(
                embeddings[i], embeddings[j]) + bias[i] + bias[j]
            log_x_ij = np.log(x_ij + 1)  # Laplace Smoothing
            weight = weights[idx]

            # Compute the loss for this pair
            loss = weight * (prediction - log_x_ij) ** 2
            total_loss += loss

            # Compute gradients
            grad_common = 2 * weight * (prediction - log_x_ij)
            grad_emb_i = grad_common * embeddings[j]
            grad_emb_j = grad_common * embeddings[i]
            grad_bias_i = grad_common
            grad_bias_j = grad_common

            # Update embeddings and biases
            embeddings[i] -= lr * grad_emb_i
            embeddings[j] -= lr * grad_emb_j
            bias[i] -= lr * grad_bias_i
            bias[j] -= lr * grad_bias_j

        total_loss_history.append(total_loss)
        print(f"Epoch {epoch} Loss: {total_loss}")

        # Convergence check
        if epoch > 0 and np.abs(total_loss_history[-1] - total_loss_history[-2]) < 1e-5:
            print(f"Convergence reached at epoch {epoch + 1}.")
            break
    
    # Save embeddings for easy future evaluation
    os.makedirs("basic_embeddings", exist_ok=True)
    np.save(
        f"basic_embeddings/basic_song_embeddings_dim_{emb_dim}.npy", embeddings)
    np.save(f"basic_embeddings/basic_song_bias_dim_{emb_dim}.npy", bias)
    print("Embeddings and biases saved to 'embeddings/' directory.")

    return embeddings


In [None]:
"""
Establishing functions for quantitative evaluation and qualitative evalution. 
"""

# Minimum absolute threshold for cosine similarity for meaningful prediction
TEST_EPSILON = 0.05

"""
Sets up indexing in order to generate t-SNE plot and show points in 2-D.
"""
def evaluate_embeddings(embeddings, val, test):
    # Note that we only need to do this once per embedding after training.
    load_dotenv()
    conn = pymysql.connect(
        user=os.getenv('DB_USERNAME'),
        password=os.getenv('DB_PASSWORD'),
        host=os.getenv('DB_HOST'),
        port=int(os.getenv('DB_PORT')),
        database=os.getenv('DB_NAME')
    )

    # Reverses song_to_index (song ID: matrix index)- now is matrix index: song ID
    index_to_song = {index: song for song, index in song_to_index.items()}
    song_to_name_embedding_dict = {}
    
    # Query database for song and artist name information.
    for song_id in song_to_index.keys():
        with conn.cursor() as cursor:
            query = f"""
                SELECT NAME, ARTISTS
                FROM CS_229_SONGS_ALL
                WHERE SONG_NUM = {song_id}
                LIMIT {1}
            """
            cursor.execute(query)
            top_songs = cursor.fetchall()
        artists = top_songs[0][1].split(DELIMITER)
        song_to_name_embedding_dict[song_id] = tuple(
            (top_songs[0][0], artists, embeddings[song_to_index[song_id]]))
        if len(song_to_name_embedding_dict.keys()) % 1000 == 0:
            print("Done processing", str(
                len(song_to_name_embedding_dict.keys())), "songs")

    """
    - LADY GAGA: 816 - Just Dance, 817 - Paparazzi, 58385 - Applause
    - JUSTIN BIEBER: 862 - Ghost, 45032 - Off My Face, 51494 - 2U
    - KENDRICK LAMAR: 24175 - Alright, 974 - PRIDE., 72162 - Rigamortus
    - PITBULL: 955 - Time of Our Lives, 9048 - Timber, 9082 - Fireball
    """
    selected_song_ids_1 = [816, 817, 58385, 862, 45032, 51494, 24175, 974, 72162, 955, 9048, 9082]

    # Evaluate quantitative metrics if specified as arguments. 
    if val:
        get_val_accuracy(embeddings)
    if test:
        get_test_accuracy(embeddings)

    # Generate t-SNE plots.
    generate_visual(song_to_name_embedding_dict, selected_song_ids_1)
    return song_to_name_embedding_dict


"""
Generate confusion matrix metrics on the validation set, given a set of embeddings. 
"""
def get_val_accuracy(embeddings):
    total_examples = len(test_examples)
    true_positive_sum = 0
    true_negative_sum = 0
    positive_count = 0
    negative_count = 0

    # Iterate through validation examples to generate and check predicted labels. 
    for ex in validation_examples:
        song_1, song_2, label = ex[0], ex[1], ex[2]
        embedding_1 = embeddings[song_to_index[song_1]]
        embedding_2 = embeddings[song_to_index[song_2]]
        cosine_sim = np.dot(embedding_1, embedding_2) / (np.linalg.norm(embedding_1) * np.linalg.norm(embedding_2))
        if cosine_sim > TEST_EPSILON:
            if ex[2] == 1:
                true_positive_sum += 1
            positive_count += 1
        elif cosine_sim < -1 * TEST_EPSILON:
            if ex[2] == 0:
                true_negative_sum += 1
            negative_count += 1

    # Count positives and negatives to generate confusion matrix metrics.
    total_correct = true_positive_sum + true_negative_sum
    total_count = positive_count + negative_count
    precision = true_positive_sum / positive_count
    recall = true_positive_sum / \
        (true_positive_sum + (negative_count - true_negative_sum))
    print(f"Validation accuracy is {total_correct / total_count}")
    print(f"Validation precision is {precision}")
    print(f"Validation recall is {recall}")
    print(f"Validation F1 is {(2 * precision * recall) / (precision + recall)}")
    return (2 * precision * recall) / (precision + recall)


"""
Generate confusion matrix metrics on the test set, given a set of embeddings. 
"""
def get_test_accuracy(embeddings):
    total_examples = len(test_examples)
    true_positive_sum = 0
    true_negative_sum = 0
    positive_count = 0
    negative_count = 0

    # Iterate through validation examples to generate and check predicted labels.
    for ex in test_examples:
        song_1, song_2, label = ex[0], ex[1], ex[2]
        embedding_1 = embeddings[song_to_index[song_1]]
        embedding_2 = embeddings[song_to_index[song_2]]
        cosine_sim = np.dot(embedding_1, embedding_2) / (np.linalg.norm(embedding_1) * np.linalg.norm(embedding_2))
        if cosine_sim > TEST_EPSILON:
            if ex[2] == 1:
                true_positive_sum += 1
            positive_count += 1
        elif cosine_sim < -1 * TEST_EPSILON:
            if ex[2] == 0:
                true_negative_sum += 1
            negative_count += 1

    # Count positives and negatives to generate confusion matrix metrics.
    total_correct = true_positive_sum + true_negative_sum
    total_count = positive_count + negative_count
    precision = true_positive_sum / positive_count
    recall = true_positive_sum / \
        (true_positive_sum + (negative_count - true_negative_sum))
    print(f"Test accuracy is {total_correct} / {total_count} = {total_correct / total_count}")
    print(f"Test precision is {precision}")
    print(f"Test recall is {recall}")
    print(f"Test F1 is {(2 * precision * recall) / (precision + recall)}")


"""
Reduces the dimensionality of embeddings to 2-D with t-SNE and plots.
"""
def generate_visual(song_to_name_embedding_dict, song_ids):
    checkpoint_embeddings_dict = song_to_name_embedding_dict

    # Get the corresponding embeddings
    selected_embeddings = [checkpoint_embeddings_dict[song_id][2] for song_id in song_ids]

    # Perform t-SNE to reduce dimensions to 2D, set perplexity lower than the number of samples
    tsne = TSNE(n_components=2, random_state=42, perplexity=10)  
    embeddings_2d = tsne.fit_transform(np.array(selected_embeddings))

    # Create a dictionary for labels (song IDs to song names)
    labels = {song_id: checkpoint_embeddings_dict[song_id][0] + " - " + ", ".join(checkpoint_embeddings_dict[song_id][1]) for song_id in song_ids}

    # Plot the reduced 2D embeddings and add text labels
    plt.figure(figsize=(8, 6))
    plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1], s=50, cmap='viridis')
    for i, song_id in enumerate(song_ids):
        plt.text(embeddings_2d[i, 0], embeddings_2d[i, 1],
                 labels[song_id], fontsize=9, ha='right', color='red')
    plt.title('2D t-SNE Visualization of Embeddings')
    plt.xlabel('t-SNE Dimension 1')
    plt.ylabel('t-SNE Dimension 2')
    plt.show()


In [None]:
"""
Basic embeddings with dimension 25.
"""
embeddings_dim_25 = basic_train(cooccurrence_matrix, cooccurrence_matrix.shape[0], emb_dim=25, alpha=0.75, lr=0.05)
basic_embeddings_25 = np.load("basic_embeddings/basic_song_embeddings_dim_25.npy")
evaluate_embeddings(basic_embeddings_25, True, True)


In [None]:
"""
Basic embeddings with dimension 75.
"""
embeddings_dim_75 = basic_train(cooccurrence_matrix, cooccurrence_matrix.shape[0], emb_dim=25, alpha=0.75, lr=0.05)
basic_embeddings_75 = np.load("basic_embeddings/basic_song_embeddings_dim_75.npy")
evaluate_embeddings(basic_embeddings_75, True, True)


In [None]:
"""
Basic embeddings with dimension 150.
"""
embeddings_dim_150 = basic_train(cooccurrence_matrix, cooccurrence_matrix.shape[0], emb_dim=25, alpha=0.75, lr=0.05)
basic_embeddings_150 = np.load("basic_embeddings/basic_song_embeddings_dim_150.npy")
evaluate_embeddings(basic_embeddings_150, True, True)


In [None]:
"""
Basic embeddings with dimension 250.
"""
embeddings_dim_250 = basic_train(cooccurrence_matrix, cooccurrence_matrix.shape[0], emb_dim=25, alpha=0.75, lr=0.05)
basic_embeddings_250 = np.load("basic_embeddings/basic_song_embeddings_dim_250.npy")
evaluate_embeddings(basic_embeddings_250, True, True)


In [None]:
"""
Basic embeddings with dimension 500.
"""
embeddings_dim_500 = basic_train(cooccurrence_matrix, cooccurrence_matrix.shape[0], emb_dim=25, alpha=0.75, lr=0.05)
basic_embeddings_500 = np.load("basic_embeddings/basic_song_embeddings_dim_500.npy")
evaluate_embeddings(basic_embeddings_500, True, True)


In [None]:
"""
Creating augmented co-occurrence matrix with artist playlists added to training set.
"""
# Copy co-occurrence matrix to increase counts for augmentation.
augmented_cooccurrence_matrix = cooccurrence_matrix.copy().tolil()

# Iterate through all artists to create artist playlists. 
artists_dict = {}
for song in all_songs:
    song_id, playlist_ids, song_name, song_artists = song[0], song[1], song[2], song[3]
    if song_id in song_to_index.keys():
        artists_list = song_artists.split('<BRK>')
        for artist in artists_list:
            if artist not in artists_dict.keys():
                artists_dict[artist] = []
            artists_dict[artist].append(song_id)
print("Done creating augmented artist playlists!")
print(f"We have {len(artists_dict.keys())} artists to analyze")

# Iterate through all artist playlists to increment the augmented co-occurrence matrix. 
new_playlist_counter = 0
for key in artists_dict.keys():
    indexes = [song_to_index[song] for song in artists_dict[key]]
    if len(indexes) >= 2:
        for i in range(len(indexes)):
            for j in range(i + 1, len(indexes)):
                augmented_cooccurrence_matrix[indexes[i], indexes[j]] += 1
                augmented_cooccurrence_matrix[indexes[j], indexes[i]] += 1
    new_playlist_counter += 1
    if new_playlist_counter % 100 == 0:
        print("Finished processing playlist #" + str(new_playlist_counter))
print("Augmented cooccurrence matrix with artists counts finished and stored in variable `augmented_cooccurrence_matrix`")

augmented_coocurrence_matrix = augmented_cooccurrence_matrix.tocsr()


In [None]:
"""
Define augmented training function, mostly for the saving path for weights. 
"""
def aug_train(aug_cooccurrence_matrix, vocab_size, emb_dim, lr=0.05, epochs=150, x_max=1000, alpha=0.75):
    # Initialize embeddings and bias
    embeddings, bias = init_embeddings(vocab_size, emb_dim)

    # Extract non-zero co-occurrence pairs (i, j, x_ij)
    # Convert sparse matrix to COO format (for easy iteration)
    coo_matrix = cooccurrence_matrix.tocoo()
    non_zero_indices = list(
        zip(coo_matrix.row, coo_matrix.col, coo_matrix.data))

    # Precompute weights for the non-zero elements
    cooccurrence_values = np.array([x[2] for x in non_zero_indices])
    weights = weighting_func(cooccurrence_values, x_max, alpha)

    total_loss_history = []

    # Training loop
    for epoch in range(epochs):
        total_loss = 0

        for idx, (i, j, x_ij) in enumerate(non_zero_indices):
            # Compute dot product + bias
            prediction = np.dot(
                embeddings[i], embeddings[j]) + bias[i] + bias[j]
            log_x_ij = np.log(x_ij + 1)  # Laplace Smoothing
            weight = weights[idx]

            # Compute the loss for this pair
            loss = weight * (prediction - log_x_ij) ** 2
            total_loss += loss

            # Compute gradients
            grad_common = 2 * weight * (prediction - log_x_ij)
            grad_emb_i = grad_common * embeddings[j]
            grad_emb_j = grad_common * embeddings[i]
            grad_bias_i = grad_common
            grad_bias_j = grad_common

            # Update embeddings and biases
            embeddings[i] -= lr * grad_emb_i
            embeddings[j] -= lr * grad_emb_j
            bias[i] -= lr * grad_bias_i
            bias[j] -= lr * grad_bias_j

        total_loss_history.append(total_loss)
        print(f"Epoch {epoch} Loss: {total_loss}")

        # Convergence check
        if epoch > 0 and np.abs(total_loss_history[-1] - total_loss_history[-2]) < 1e-4:
            print(f"Convergence reached at epoch {epoch + 1}.")
            break

    # Store embeddings in new folder dedicated to augmented embeddings.
    os.makedirs("aug_embeddings", exist_ok=True)
    np.save(
        f"aug_embeddings/aug_song_embeddings_dim_{emb_dim}.npy", embeddings)
    np.save(f"aug_embeddings/basic_song_bias_dim_{emb_dim}.npy", bias)
    print("Embeddings and biases saved to 'aug_embeddings/' directory.")

    return embeddings


In [None]:
"""
Augmented embeddings with dimension 250.
"""
aug_embeddings_dim_250 = aug_train(cooccurrence_matrix, cooccurrence_matrix.shape[0], emb_dim=25, alpha=0.75, lr=0.05)
aug_embeddings_250 = np.load("aug_embeddings/aug_song_embeddings_dim_250.npy")
evaluate_embeddings(aug_embeddings_250, True, True)


In [None]:
"""
Augmented embeddings with dimension 500.
"""
aug_embeddings_dim_500 = aug_train(cooccurrence_matrix, cooccurrence_matrix.shape[0], emb_dim=25, alpha=0.75, lr=0.05)
aug_embeddings_500 = np.load("aug_embeddings/aug_song_embeddings_dim_500.npy")
evaluate_embeddings(aug_embeddings_250, True, True)

In [None]:
"""
Gather playlist titles for contrastive learning generation.
"""
# First, make a list of index -> playlist ID. That way we can do list[index] = ID
playlist_index_to_ID = dict()

# Also, make a playlist index -> title so we can get our BERT weights
playlist_index_to_title = dict()

# SQL Query to get this information
try:
    with conn.cursor() as cursor:
        query = f"""SELECT PLAYLIST_ID, NAME FROM CS_229_PLAYLISTS_ALL"""
        cursor.execute(query)
        # Fetch all results
        playlist_data = cursor.fetchall()
        # Print results
        index = 0
        for playlist in playlist_data:
            if playlist[0] in train_playlist_set:
                playlist_index_to_ID[index] = playlist[0]
                playlist_index_to_title[index] = playlist[1]
                index += 1
finally:
    # Close the connection
    conn.close()
print(len(playlist_index_to_title))
print(playlist_index_to_ID)


In [None]:
"""
Import BERT models and tokenize.
"""

import torch
from transformers import BertTokenizer, BertModel

# Load pre-trained model and tokenizer: BERT
bertTokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bertModel = BertModel.from_pretrained('bert-base-uncased')

# Now we get their BERT scores in one np array, and define margin term m
n = len(train_playlist_set)
m = 768 

BERTarray = np.zeros((n, m))
for i in range(n):
    inputs = bertTokenizer(
        playlist_index_to_title[i], return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        outputs = bertModel(**inputs)
        bertData = np.array(outputs.pooler_output)
        BERTarray[i] = bertData / np.linalg.norm(bertData)
        # Normalize by magnitude so we can dot product directly
print(BERTarray)


In [None]:
"""
Generate playlist embeddings by BERT embeddings.
"""
# Now we have our bert array, which at each index BERTarray[i] = embedding of playlist of index i
playlist_similarity_scores = np.matmul(BERTarray, BERTarray.T) - np.identity(n)
print(playlist_similarity_scores)


In [None]:
"""
Generate candidates for positive and negative pairs for contrastive learning.
"""

# Now, flatten the array, argsort to find the best and worst, select some number of them, convert the indices back to 2D
bestworst_playlist_count = 100
flattened = playlist_similarity_scores.ravel()
top_10_indices_flat = np.argpartition(
    flattened, -2*bestworst_playlist_count)[-2*bestworst_playlist_count:]  # Largest 20 (unsorted)
top_10_indices_flat = top_10_indices_flat[np.argsort(
    flattened[top_10_indices_flat])[::-1]]  # Sort descending

# Get the indices of the lowest 20 values
low_10_indices_flat = np.argpartition(
    flattened, 2*bestworst_playlist_count)[:2*bestworst_playlist_count]  # Smallest 20 (unsorted)
low_10_indices_flat = low_10_indices_flat[np.argsort(
    flattened[low_10_indices_flat])]  # Sort ascending

# Convert back to 2D indices
top_10_indices_2d = np.unravel_index(
    top_10_indices_flat, playlist_similarity_scores.shape)
low_10_indices_2d = np.unravel_index(
    low_10_indices_flat, playlist_similarity_scores.shape)

# Retrieve the top 20 and lowest 20 values
top_10_values = flattened[top_10_indices_flat]
low_10_values = flattened[low_10_indices_flat]

# Combine indices and values for output
top_10_results_doubled = list(zip(*top_10_indices_2d))
low_10_results_doubled = list(zip(*low_10_indices_2d))
top_sim_results = list()
low_sim_results = list()
for i in range(0, len(top_10_results_doubled), 2):
    top_sim_results.append(top_10_results_doubled[i])
    low_sim_results.append(low_10_results_doubled[i])
    # cut out duplicates

print(top_sim_results)


In [None]:
"""
Generate positive and negative pairs by sampling from top results.
"""

# Sample randomly from each playlist pair of best and worst to create our positive and negative example pairs
positive_examples = set()
negative_examples = set()
examples_coefficient = 20

# Iterate through all playlists, pulling from the highest playlist pairs.
for i in range(bestworst_playlist_count):
    top_playlist1 = playlist_index_to_ID[top_sim_results[i][0]]
    top_playlist2 = playlist_index_to_ID[top_sim_results[i][1]]
    low_playlist1 = playlist_index_to_ID[low_sim_results[i][0]]
    low_playlist2 = playlist_index_to_ID[low_sim_results[i][1]]
    for j in range(examples_coefficient):
        song_1 = np.random.choice(train_playlist_songs_dict[top_playlist1])
        song_2 = np.random.choice(train_playlist_songs_dict[top_playlist2])
        if (song_1 in train_songs and song_2 in train_songs):
            positive_examples.add((song_1, song_2))
        song_1 = np.random.choice(train_playlist_songs_dict[low_playlist1])
        song_2 = np.random.choice(train_playlist_songs_dict[low_playlist2])
        if (song_1 in train_songs and song_2 in train_songs):
            negative_examples.add((song_1, song_2))
positive_examples = list(positive_examples)
negative_examples = list(negative_examples)
print(positive_examples)


In [None]:
"""
Define cosine similarity helper function. 
"""
def cosine_similarity(vec1, vec2):
    # Compute the dot product
    dot_product = np.dot(vec1, vec2)

    # Compute the norms (magnitudes) of the vectors
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)

    # Avoid division by zero
    if norm_vec1 == 0 or norm_vec2 == 0:
        return 0.0

    # Compute cosine similarity
    similarity = dot_product / (norm_vec1 * norm_vec2)
    return similarity

margin = 0

"""
Defines contrastive loss function.
"""
def contrastive_loss(embeddings, margin):
    loss = 0
    for i in range(len(positive_examples)):
        song_1_embedding = embeddings[song_to_index[positive_examples[i][0]]]
        song_2_embedding = embeddings[song_to_index[positive_examples[i][1]]]
        loss += 1 - cosine_similarity(song_1_embedding, song_2_embedding)
        # Technically can't guarantee num positive examples = num negative so we'll just loop twice
    for i in range(len(negative_examples)):
        song_1_embedding = embeddings[song_to_index[negative_examples[i][0]]]
        song_2_embedding = embeddings[song_to_index[negative_examples[i][1]]]
        loss += max(0, cosine_similarity(song_1_embedding,
                    song_2_embedding) - margin)
    return loss


In [2]:
"""
Define contrastive learning loop modified from the original training loop. 
"""
verbose = False

"""
Define the contrastive training function. 
"""
def train_contrastive(embeddings, threshold, margin):
    # Use optimal hyperparameters from tuning. 
    learningRate = 0.05
    epsilon = 1e-7
    epochCount = 0
    diff = float('inf')
    prev = 0
    curr = contrastive_loss(embeddings, margin)

    # Print extra statements if verbose specified
    if verbose:
        print(f"Epoch {epochCount}")
        print(f"Loss: {curr}")

    # While not converged yet, keep iterating
    while diff > threshold:
        epochCount += 1
        gradients = np.zeros_like(embeddings)
        
        # Modify embeddings within the positive pairs. 
        for i in range(len(positive_examples)):
            song_1_embedding = embeddings[song_to_index[positive_examples[i][0]]]
            song_2_embedding = embeddings[song_to_index[positive_examples[i][1]]]
            sim = cosine_similarity(song_1_embedding, song_2_embedding)
            gradients[song_to_index[positive_examples[i][0]]] += sim * song_1_embedding / ((np.linalg.norm(song_1_embedding))**2) - (
                (song_2_embedding)/(np.linalg.norm(song_1_embedding) * np.linalg.norm(song_2_embedding)))
            gradients[song_to_index[positive_examples[i][1]]] += sim * song_2_embedding / ((np.linalg.norm(song_2_embedding))**2) - (
                (song_1_embedding)/(np.linalg.norm(song_2_embedding) * np.linalg.norm(song_1_embedding)))
        
        # Modify embeddings within the negative pairs.
        for i in range(len(negative_examples)):
            song_1_embedding = embeddings[song_to_index[negative_examples[i][0]]]
            song_2_embedding = embeddings[song_to_index[negative_examples[i][1]]]
            sim = cosine_similarity(song_1_embedding, song_2_embedding)
            if sim > margin:
                gradients[song_to_index[positive_examples[i][0]]] -= sim * song_1_embedding / ((np.linalg.norm(song_1_embedding))**2) - (
                    (song_2_embedding)/(np.linalg.norm(song_1_embedding) * np.linalg.norm(song_2_embedding)))
                gradients[song_to_index[positive_examples[i][1]]] -= sim * song_2_embedding / ((np.linalg.norm(song_2_embedding))**2) - (
                    (song_1_embedding)/(np.linalg.norm(song_2_embedding) * np.linalg.norm(song_1_embedding)))
        embeddings -= learningRate/(epochCount ** 2) * gradients
        prev = curr
        curr = contrastive_loss(embeddings, margin)
        diff = prev - curr
        if verbose and (epochCount % 100 == 0):
            print(f"Epoch {epochCount}")
            print(f"Loss: {curr}")
            print(f"Difference: {diff}")

In [None]:
"""
Grid search on the margin term for hyperparameter tuning.
"""
bestEval = 0
bestMargin = 0
for i in range(-10, 10):
    threshold = 1e-4
    margin = i/10
    contrastive_song_embeddings_dim_500 = np.load("basic_embeddings/basic_song_embeddings_dim_500.npy")
    train_contrastive(contrastive_song_embeddings_dim_500, threshold, margin)
    print(f"Margin: {margin}")
    print("Contrastive embeddings dim 500:")
    f1 = get_val_accuracy(contrastive_song_embeddings_dim_500)
    if (f1 > bestEval):
        bestEval = f1
        bestMargin = margin
        print(bestMargin)


In [None]:
"""
Complete training for contrastive learning for models with length 250 or 500. 
"""
threshold = 1e-7
contrastive_song_embeddings_dim_250 = np.load("basic_embeddings/basic_song_embeddings_dim_250.npy")
contrastive_song_embeddings_dim_500 = np.load("basic_embeddings/basic_song_embeddings_dim_500.npy")

print(f"Best Margin: {bestMargin}")
train_contrastive(contrastive_song_embeddings_dim_250, threshold, bestMargin)
train_contrastive(contrastive_song_embeddings_dim_500, threshold, bestMargin)

# Train the final model which integrates both contrastive and augmentation
contrastive_aug_embeddings_dim_250 = np.load("aug_embeddings/aug_song_embeddings_dim_250.npy")
contrastive_aug_embeddings_dim_500 = np.load("aug_embeddings/aug_song_embeddings_dim_500.npy")

train_contrastive(contrastive_aug_embeddings_dim_250, threshold, bestMargin)
train_contrastive(contrastive_aug_embeddings_dim_500, threshold, bestMargin)


In [None]:
"""
Save embeddings to file for easier uploading next time. 
"""
os.makedirs("cont_embeddings", exist_ok=True)
np.save(f"cont_embeddings/cont_song_embeddings_dim_{250}.npy", contrastive_song_embeddings_dim_250)
print("Embeddings saved to \'cont_embeddings/\' directory.")
np.save(f"cont_embeddings/cont_song_embeddings_dim_{500}.npy", contrastive_song_embeddings_dim_500)
print("Embeddings saved to \'cont_embeddings/\' directory.")

os.makedirs("cont_aug_embeddings", exist_ok=True)
np.save(f"cont_aug_embeddings/cont_aug_song_embeddings_dim_{250}.npy", contrastive_aug_embeddings_dim_250)
print("Embeddings saved to \'cont_aug_embeddings/\' directory.")
np.save(f"cont_aug_embeddings/cont_aug_song_embeddings_dim_{500}.npy", contrastive_aug_embeddings_dim_500)
print("Embeddings saved to \'cont_embeddings/\' directory.")

In [None]:
"""
Evaluate the contrastive models, then compare with the other existing models.
"""
contrastive_song_embeddings_dim_250 = np.load(f"cont_embeddings/cont_song_embeddings_dim_250.npy")
print("Contrastive embeddings dim 250:")
get_val_accuracy(contrastive_song_embeddings_dim_250)

contrastive_song_embeddings_dim_500 = np.load(f"cont_embeddings/cont_song_embeddings_dim_500.npy")
print("Contrastive embeddings dim 500:")
get_val_accuracy(contrastive_song_embeddings_dim_500)

contrastive_aug_embeddings_dim_250 = np.load(f"cont_aug_embeddings/cont_aug_song_embeddings_dim_250.npy")
print("Contrastive Augmented embeddings dim 250:")
get_val_accuracy(contrastive_aug_embeddings_dim_250)

contrastive_song_embeddings_dim_500 = np.load(f"cont_aug_embeddings/cont_aug_song_embeddings_dim_500.npy")
print("Contrastive Augmented embeddings dim 500:")
get_val_accuracy(contrastive_aug_embeddings_dim_500)