In [1]:
import numpy as np
import pandas as pd
import time
from scipy.sparse import coo_matrix
from scipy.sparse import csr_matrix
import lightfm
from lightfm.data import Dataset
from lightfm import LightFM
from lightfm.evaluation import precision_at_k
import matplotlib.pyplot as plt
from distributed import Client
from dask_jobqueue import SLURMCluster
from IPython.display import display
import matplotlib.pyplot as plt
import os
from glob import glob
import dask
import dask.bag as db
import dask.dataframe as dd

  from distributed.utils import tmpfile


In [2]:
# Set LOCAL to True for single-machine execution while developing
# Set LOCAL to False for cluster execution
LOCAL = False

if LOCAL:
    # This line creates a single-machine dask client
    client = Client()
else:    
    # This line creates a SLURM cluster dask and dask client
    # Logging outputs will be stored in /scratch/{your-netid}
    
    cluster = SLURMCluster(
                           # Memory and core limits should be sufficient here
                           memory='32GB', cores=4,

                           # Ensure that Dask uses the correct version of Python on the cluster
                           python='/scratch/work/public/dask/{}/bin/python'.format(dask.__version__),                           
                           
                           # Place the output logs in an accessible location
                           job_extra=['--export NONE --output=/scratch/{}/slurm-%j.out'.format(os.environ['SLURM_JOB_USER'])]
    )

    cluster.submit_command = 'slurm'
    cluster.scale(50)

    display(cluster)
    client = Client(cluster)

display(client)

Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.32.35.21:8787/status,

0,1
Dashboard: http://10.32.35.21:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.32.35.21:36031,Workers: 0
Dashboard: http://10.32.35.21:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [3]:
# These parquets were generated after preprocessing the datasets, splitting into 80-20
# The rating column 'song_listens' and numeric column 'recording_msid_idx' were also calculated in these parquets
train_df = pd.read_parquet("index_training_small.parquet")
validation_df = pd.read_parquet("index_validation_small.parquet")
test_df = pd.read_parquet("index_test_small.parquet")

In [4]:
concatenated_data = pd.concat([train_df, test_df, validation_df]).drop_duplicates()
concatenated_data = concatenated_data.sort_values(['recording_msid_idx'])
concatenated_data['song_id'] = (concatenated_data.groupby(['recording_msid_idx'], sort=False).ngroup()+1)

In [5]:
train_df = train_df.merge(concatenated_data, on=['recording_msid_idx','user_id','song_listens'], how="left")
validation_df = validation_df.merge(concatenated_data, on=['recording_msid_idx','user_id','song_listens'], how="left")
test_df = test_df.merge(concatenated_data, on=['recording_msid_idx','user_id','song_listens'], how="left")

In [6]:
train_df = train_df.drop(columns=['recording_msid_x', 'recording_msid_y'])
validation_df = validation_df.drop(columns=['recording_msid_x', 'recording_msid_y'])
test_df = test_df.drop(columns=['recording_msid_x', 'recording_msid_y'])

In [7]:
data = Dataset()
data.fit(users = np.unique(concatenated_data["user_id"]), items = np.unique(concatenated_data["song_id"]))

In [8]:
interactions_train, weights_train = data.build_interactions([(train_df['user_id'][i], 
                                                              train_df['song_id'][i],
                                                              train_df['song_listens'][i]) for i in range(train_df.shape[0])])

In [9]:
interactions_val, weights_val = data.build_interactions([(validation_df['user_id'][i],
                                                          validation_df['song_id'][i], 
                                                          validation_df['song_listens'][i]) for i in range(validation_df.shape[0])])

In [10]:
interactions_test, weights_test = data.build_interactions([(test_df['user_id'][i],
                                                          test_df['song_id'][i], 
                                                          test_df['song_listens'][i]) for i in range(test_df.shape[0])])

In [None]:
# WARP model with alpha = 0.1
start_warp_1 = time.time()
lfm = LightFM(loss='warp', no_components = 10, user_alpha = 0.1)
model = model.fit(interactions = interactions_train, sample_weight= weights_train, 
                  epochs = 1, verbose = True)
validation_precision_warp_1 = precision_at_k(lfm, interactions_val, k = 100)
mean_validation_precision_warp_1 = validation_precision_warp_1.mean()
end_warp_1 = time.time()

In [None]:
print("Precision for validation set is:", mean_validation_precision_warp_1)
print("Time spent is:", end_warp_1 - start_warp_1)

In [None]:
# BPR model with alpha = 0.1
start_bpr_1 = time.time()
lfm = LightFM(loss='bpr', no_components = 10, user_alpha = 0.1)
lfm = lfm.fit(interactions = interactions_train, sample_weight= weights_train, 
                  epochs = 1, verbose = True)
validation_precision_bpr_1 = precision_at_k(lfm, interactions_val, k = 100)
mean_validation_precision_bpr_1 = validation_precision_bpr_1.mean()
end_bpr_1 = time.time()

In [None]:
print("Precision for validation set is:", mean_validation_precision_bpr_1)
print("Time spent is:", end_bpr_1 - start_bpr_1)

In [None]:
# WARP model with alpha = 1
alpha2 = 1
start_warp_2 = time.time()
lfm = LightFM(loss='warp', no_components = 10, user_alpha = alpha2)
lfm = lfm.fit(interactions = interactions_train, sample_weight= weights_train, 
                  epochs = 1, verbose = True)
validation_precision_warp_2 = precision_at_k(lfm, interactions_val, k = 100)
mean_validation_precision_warp_2 = validation_precision_warp_2.mean()
end_warp_2 = time.time()

In [None]:
print("Precision for validation is:", mean_validation_precision_warp_2)
print("Time spent is:", end_warp_2 - start_warp_2)

In [None]:
# WARP model with alpha = 0.001
alpha3 = 0.001
start_warp_3 = time.time()
lfm = LightFM(loss='warp', no_components = 10, user_alpha = alpha3)
lfm = lfm.fit(interactions = interactions_train, sample_weight= weights_train, 
                  epochs = 1, verbose = True)
validation_precision_warp_3 = precision_at_k(lfm, interactions_val, k = 100)
mean_validation_precision_warp_3 = validation_precision_warp_3.mean()
end_warp_3 = time.time()

Epoch: 100%|██████████| 1/1 [00:45<00:00, 45.37s/it]


In [None]:
print("Precision for validation set is:", mean_validation_precision_warp_3)
print("Time spent is:", end_warp_3 - start_warp_3)

In [None]:
# Optimal WARP model with alpha = 0.001 to compute test accuracy
test_precision_warp = precision_at_k(lfm, interactions_test, k = 100)
mean_test_precision_warp = test_precision_warp.mean()

In [None]:
print("Precision for test set is:", mean_test_precision_warp)