In [30]:
import json
import numpy as np
import os
from collections import *
from tqdm import tqdm
from scipy.sparse import csr_matrix, vstack, csr_array
import time
import pickle, gzip, joblib, shelve
import tkinter as tk
from tkinter import ttk, font
import time
from itertools import islice, combinations
from datetime import datetime
import plotly.express as px
import random

In [61]:
directory_path = 'data/raw'
filenames = sorted(os.listdir(directory_path))
fullpaths = [directory_path + '/' + f for f in filenames][985:]
print(len(fullpaths) * 1000)
save = False

15000


In [13]:
with open('/Users/idotzhori/Desktop/Linear Algebra/SongRank2/data_test/challenge_set.json') as f:
# with open(fullpaths[1]) as f:
    mpd_slice = json.load(f)

playlists = mpd_slice['playlists']

In [16]:
holdouts = []
tracks = []
samples = []
for playlist in playlists:
    num_holdouts = playlist['num_holdouts']
    num_tracks = playlist['num_tracks']
    num_samples = playlist['num_samples']

    holdouts.append(num_holdouts)
    tracks.append(num_tracks)
    samples.append(num_samples)

In [27]:
fig1 = px.histogram(holdouts, title='Distribution of Holdouts', labels={'value': 'Number of Holdouts'}, nbins=30)
fig1.show()

fig2 = px.histogram(tracks, title='Distribution of Tracks', labels={'value': 'Number of Tracks'}, nbins=30)
fig2.show()

fig3 = px.histogram(samples, title='Distribution of Samples', labels={'value': 'Number of Samples'}, nbins=30)
fig3.show()

In [63]:
def construct_test_set(playlists, holdout_percentage=0.25):
    
    test_set = {}
    
    for playlist in playlists:
        if playlist['num_tracks'] > 50:
            shuffled_tracks = playlist['tracks'].copy()
            random.shuffle(shuffled_tracks)
            
            num_holdout_tracks = int(playlist['num_tracks'] * holdout_percentage)
            
            holdout_tracks = shuffled_tracks[:num_holdout_tracks]
            remaining_tracks = shuffled_tracks[num_holdout_tracks:]
            
            test_set_entry = {
                'holdout_tracks': [t['track_uri'] for t in holdout_tracks],
                'tracks_for_recommendation': [t['track_uri'] for t in remaining_tracks]
            }
            
            test_set[playlist['pid']] = test_set_entry
            
    return test_set

# Construct the overall test set
overall_test_set = {}
for path in fullpaths:
    with open(path) as f:
        mpd_slice = json.load(f)
        slice_test_set = construct_test_set(mpd_slice['playlists']) # Using 50% as holdout
        overall_test_set.update(slice_test_set)

print(f'size of test set: {len(overall_test_set)}')


size of test set: 7282


In [14]:
# to load cooccurrence_matrix from storage
f_time = '01_12_08_2023'
save_path = os.path.join('song_data', f'{f_time}_cooccurrence_matrix.gz')

with gzip.open(save_path, 'rb') as f:
    cooccurrence_matrix = pickle.load(f)

In [5]:
folder_path = 'song_data/pmi'
saved_chunk_files = [folder_path + '/' + f for f in os.listdir(folder_path) if "01_12_08_2023" in f]
saved_chunk_files = sorted(saved_chunk_files, key = lambda x: int(x.split('_')[5]))
chunks = []
for chunk_file in saved_chunk_files:
    with gzip.open(chunk_file, 'rb') as f:
        chunks.append(pickle.load(f))

pmi_matrix = vstack(chunks, format='csr')
print(f'Finished processing matrix size: {pmi_matrix.shape}')

Finished processing matrix size: (1341936, 1341936)


In [6]:
f_time = '00_12_08_2023'
save_path = os.path.join('song_data', f'{f_time}_song_indices.gz')

with gzip.open(save_path, 'rb') as f:
    song_indices = pickle.load(f)

num_songs = len(song_indices)
print(num_songs)

1341936


In [9]:
def user_playlist_vector(playlist_songs, song_indices, num_songs):
    if len(playlist_songs) == 0:
        user_vector = np.ones(num_songs)
    user_vector = np.zeros(num_songs)
    for song in playlist_songs:
        user_vector[song_indices[song]] = 1
    return csr_array(user_vector)

def compute_scores(user_vector, pmi_matrix):
    scores = user_vector.dot(pmi_matrix)
    return scores.toarray()[0]

def get_top_recommendations(scores, song_data_map, n=10):
    top_indices = np.argsort(scores)[-n:]
    top_songs = [song_data_map[list(song_indices)[i]] for i in top_indices]
    return top_songs

def recommend_songs_pmi(user_playlist, song_indices, pmi_matrix, song_data_map, n=10):
    user_vector = user_playlist_vector(user_playlist, song_indices, num_songs)
    scores = compute_scores(user_vector, pmi_matrix)
    return get_top_recommendations(scores, song_data_map, n)