## 1. Data Exploration and Pre-processing

In [1]:
'''import libraries'''
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import SparseVector
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import broadcast
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.mllib.linalg import Matrices
spark = SparkSession.builder.appName('listenBrainz - exploratory data analysis').getOrCreate()
import pandas as pd
import numpy as np
import os
import sys
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType

!pip install pyarrow
!pip install scikit-learn
!pip install implicit

from scipy.sparse import coo_matrix
import scipy.sparse as sparse
import implicit
from implicit.als import AlternatingLeastSquares

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from scipy.sparse import csr_matrix

from pyspark.ml.evaluation import RegressionEvaluator,RankingEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import RankingMetrics

23/05/16 22:53:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


## 2. Import ListenBrainz dataset from given path on the cluster

In [2]:
'''import dataset'''

# small dataset
interactions_train_small = pd.read_parquet('/scratch/work/courses/DSGA1004-2021/listenbrainz/interactions_train_small.parquet')
tracks_train_small = pd.read_parquet('/scratch/work/courses/DSGA1004-2021/listenbrainz/tracks_train_small.parquet')
users_train_small = pd.read_parquet('/scratch/work/courses/DSGA1004-2021/listenbrainz/users_train_small.parquet')


# complete dataset
interactions_train = pd.read_parquet('/scratch/work/courses/DSGA1004-2021/listenbrainz/interactions_train.parquet')
tracks_train = pd.read_parquet('/scratch/work/courses/DSGA1004-2021/listenbrainz/tracks_train.parquet')
users_train = pd.read_parquet('/scratch/work/courses/DSGA1004-2021/listenbrainz/users_train.parquet')


# test dataset
interactions_test = pd.read_parquet('/scratch/work/courses/DSGA1004-2021/listenbrainz/interactions_test.parquet')
tracks_test = pd.read_parquet('/scratch/work/courses/DSGA1004-2021/listenbrainz/tracks_test.parquet')

In [3]:
'''print info of pandas dataframes'''

print('interactions_train_small: ', interactions_train_small.info())
print ('*******')
print('tracks_train_small: ', tracks_train_small.info())
print ('*******')
print('users_train_small: ', users_train_small.info())
print ('*******')

print('interactions_train: ', interactions_train.info())
print ('*******')
print('tracks_train: ', tracks_train.info())
print ('*******')


print('interactions_test: ', interactions_test.info())
print ('*******')
print('tracks_test: ', tracks_test.info())
print ('*******')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 51080140 entries, 0 to 51080139
Data columns (total 3 columns):
 #   Column          Dtype         
---  ------          -----         
 0   user_id         Int32         
 1   recording_msid  string        
 2   timestamp       datetime64[ns]
dtypes: Int32(1), datetime64[ns](1), string(1)
memory usage: 1023.0 MB
interactions_train_small:  None
*******
<class 'pandas.core.frame.DataFrame'>
Index: 18969111 entries, 2 to 28704238
Data columns (total 4 columns):
 #   Column          Dtype 
---  ------          ----- 
 0   recording_msid  string
 1   artist_name     string
 2   track_name      string
 3   recording_mbid  string
dtypes: string(4)
memory usage: 723.6 MB
tracks_train_small:  None
*******
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6954 entries, 0 to 6953
Data columns (total 2 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   user_id    6954 non-null   Int32 
 1   user_name  6954

In [4]:
'''get datasets shape'''

print ('interactions_train_small: ', interactions_train_small.shape)
print ('tracks_train_small: ', tracks_train_small.shape)
print('users_train_small: ',users_train_small.shape)

print ('interactions_train: ', interactions_train.shape)
print ('tracks_train: ', tracks_train.shape)

print('interactions_test: ',interactions_test.shape)
print('tracks_test: ',tracks_test.shape)

interactions_train_small:  (51080140, 3)
tracks_train_small:  (18969111, 4)
users_train_small:  (6954, 2)
interactions_train:  (179466123, 3)
tracks_train:  (28704239, 4)
interactions_test:  (50031760, 3)
tracks_test:  (11037988, 4)


## 3. Pre-processing

In [5]:
'''perform the join operation'''

# small dataset
joined_df_small = pd.merge(interactions_train_small, tracks_train_small, on='recording_msid', how='left')
print('joined_df: ',joined_df_small.shape)

# complete dataset
joined_df_train = pd.merge(interactions_train, tracks_train, on='recording_msid', how='left')
print('joined_df_train: ',joined_df_train.shape)

# test dataset
joined_df_test = pd.merge(interactions_test, tracks_test, on='recording_msid', how='left')
print('joined_df_test: ',joined_df_test.shape)

joined_df:  (51080140, 6)
joined_df_train:  (179466123, 6)
joined_df_test:  (50031760, 6)


In [6]:
'''create a dataframe where recording_mbid is present'''

# small dataset
rec_mbid_present_small = joined_df_small[joined_df_small['recording_mbid'].notnull()]
print('rec_mbid_present_small shape: ', rec_mbid_present_small.shape)

# complete dataset
rec_mbid_present_train = joined_df_train[joined_df_train['recording_mbid'].notnull()]
print('rec_mbid_present_train shape: ', rec_mbid_present_train.shape)


# test dataset
rec_mbid_present_test = joined_df_test[joined_df_test['recording_mbid'].notnull()]
print('rec_mbid_present_test shape: ', rec_mbid_present_test.shape)

rec_mbid_present_small shape:  (19226385, 6)
rec_mbid_present_train shape:  (75330453, 6)
rec_mbid_present_test shape:  (13624362, 6)


In [7]:
'''create a dataframe where recording_mbid is null'''

# small dataset
rec_mbid_null_small = joined_df_small[joined_df_small['recording_mbid'].isnull()]
print('rec_mbid_null_small shape: ', rec_mbid_null_small.shape)

# complete dataset
rec_mbid_null_train = joined_df_train[joined_df_train['recording_mbid'].isnull()]
print('rec_mbid_null_train shape: ', rec_mbid_null_train.shape)


# test dataset
rec_mbid_null_test = joined_df_test[joined_df_test['recording_mbid'].isnull()]
print('rec_mbid_null_test shape: ', rec_mbid_null_test.shape)

rec_mbid_null_small shape:  (31853755, 6)
rec_mbid_null_train shape:  (104135670, 6)
rec_mbid_null_test shape:  (36407398, 6)


In [8]:
# small dataset
'''group recording_msid by recording_mbid and get the first recording_msid for each group'''
rec_mbid_present_grouped_small = rec_mbid_present_small.groupby('recording_mbid').agg({'recording_msid': 'first'})

'''rename the column to first_recording_msid'''
rec_mbid_present_grouped_small = rec_mbid_present_grouped_small.rename(columns={'recording_msid': 'first_recording_msid'})

'''merge rec_mbid_present_grouped and rec_mbid_present'''
merged_df_small = rec_mbid_present_small.merge(rec_mbid_present_grouped_small, on='recording_mbid')
print('merged_df_small shape: ', merged_df_small.shape)

# train dataset
rec_mbid_present_grouped_train = rec_mbid_present_train.groupby('recording_mbid').agg({'recording_msid': 'first'})
rec_mbid_present_grouped_train = rec_mbid_present_grouped_train.rename(columns={'recording_msid': 'first_recording_msid'})
merged_df_train = rec_mbid_present_train.merge(rec_mbid_present_grouped_train, on='recording_mbid')
print('merged_df_train shape: ', merged_df_train.shape)

# test dataset
rec_mbid_present_grouped_test = rec_mbid_present_test.groupby('recording_mbid').agg({'recording_msid': 'first'})
rec_mbid_present_grouped_test = rec_mbid_present_grouped_test.rename(columns={'recording_msid': 'first_recording_msid'})
merged_df_test = rec_mbid_present_test.merge(rec_mbid_present_grouped_test, on='recording_mbid')
print('merged_df_test shape: ', merged_df_test.shape)

merged_df_small shape:  (19226385, 7)
merged_df_train shape:  (75330453, 7)
merged_df_test shape:  (13624362, 7)


In [9]:
'''drop recording_msid'''

# small dataset
merged_df_small  = merged_df_small .drop(['recording_msid'], axis=1)
merged_df_small = merged_df_small.rename(columns={'first_recording_msid':'recording_msid'})
print('merged_df_small shape: ', merged_df_small.shape)

# train dataset
merged_df_train  = merged_df_train .drop(['recording_msid'], axis=1)
merged_df_train = merged_df_train.rename(columns={'first_recording_msid':'recording_msid'})
print('merged_df_train shape: ', merged_df_train.shape)

# test dataset
merged_df_test  = merged_df_test .drop(['recording_msid'], axis=1)
merged_df_test = merged_df_test.rename(columns={'first_recording_msid':'recording_msid'})
print('merged_df_test shape: ', merged_df_test.shape)

merged_df_small shape:  (19226385, 6)
merged_df_train shape:  (75330453, 6)
merged_df_test shape:  (13624362, 6)


In [10]:
'''merging dataframes with mbid and without mbid'''

# small dataset
concat_df_small = pd.concat([merged_df_small, rec_mbid_null_small], axis=0)
print('concat_df_small: ',concat_df_small.shape)

# train dataset
concat_df_train = pd.concat([merged_df_train, rec_mbid_null_train], axis=0)
print('concat_df_train: ',concat_df_train.shape)

# test dataset
concat_df_test = pd.concat([merged_df_test, rec_mbid_null_test], axis=0)
print('concat_df_test: ',concat_df_test.shape)

concat_df_small:  (51080140, 6)
concat_df_train:  (179466123, 6)
concat_df_test:  (50031760, 6)


In [11]:
'''check if there is any null recording_msid'''
print(concat_df_small['recording_msid'].isnull().sum())
print(concat_df_train['recording_msid'].isnull().sum())
print(concat_df_test['recording_msid'].isnull().sum())

0
0
0


In [12]:
'''drop recording_mbid'''
concat_df_small = concat_df_small.drop(['recording_mbid'], axis=1)
concat_df_train = concat_df_train.drop(['recording_mbid'], axis=1)
concat_df_test = concat_df_test.drop(['recording_mbid'], axis=1)

In [13]:
'''group DataFrame by user_id and recording_msid, and count number of listens'''
listen_counts_small = concat_df_small.groupby(['user_id', 'recording_msid']).size().reset_index(name='listens')
print('listen_counts_small: ',listen_counts_small.shape)

listen_counts_train = concat_df_train.groupby(['user_id', 'recording_msid']).size().reset_index(name='listens')
print('listen_counts_train: ',listen_counts_train.shape)

listen_counts_test = concat_df_test.groupby(['user_id', 'recording_msid']).size().reset_index(name='listens')
print('listen_counts_test: ',listen_counts_test.shape)

listen_counts_small:  (21922976, 3)
listen_counts_train:  (60912467, 3)
listen_counts_test:  (21037310, 3)


In [14]:
'''print average and median listens'''
print(f"Average listens for listen_counts_small: {listen_counts_small['listens'].mean()}")
print(f"Median listens for listen_counts_small: {listen_counts_small['listens'].median()}")

print(f"Average listens for listen_counts_train: {listen_counts_train['listens'].mean()}")
print(f"Median listens for listen_counts_train: {listen_counts_train['listens'].median()}")

print(f"Average listens for listen_counts_test: {listen_counts_test['listens'].mean()}")
print(f"Median listens for listen_counts_test: {listen_counts_test['listens'].median()}")

Average listens for listen_counts_small: 2.3299820243383014
Median listens for listen_counts_small: 1.0
Average listens for listen_counts_train: 2.9462954275025504
Median listens for listen_counts_train: 1.0
Average listens for listen_counts_test: 2.3782394231962165
Median listens for listen_counts_test: 1.0


In [15]:
'''filter rows with listens less than average'''
filtered_df_small = listen_counts_small[listen_counts_small['listens'] > listen_counts_small['listens'].median() + 3]
filtered_df_train = listen_counts_train[listen_counts_train['listens'] > listen_counts_train['listens'].median() + 3]

In [16]:
# small dataset
user_means_small = filtered_df_small.groupby('user_id')['listens'].mean()
song_means_small = filtered_df_small.groupby('recording_msid')['listens'].mean()

filtered_df_small = filtered_df_small.merge(user_means_small, on='user_id', suffixes=('', '_user_mean'))
filtered_df_small = filtered_df_small.merge(song_means_small, on='recording_msid', suffixes=('', '_song_mean'))
filtered_df_small['listens_centered'] = filtered_df_small['listens'] - filtered_df_small['listens_user_mean'] - filtered_df_small['listens_song_mean']
filtered_df_small = filtered_df_small.drop(['listens_user_mean', 'listens_song_mean'], axis=1)
filtered_df_small = filtered_df_small.rename(columns={'listens_centered':'listens_mean'})
filtered_df_small = filtered_df_small[filtered_df_small['listens_mean'] >= 0]
filtered_df_small['listens_mean'] = filtered_df_small['listens_mean'].round().astype(int)
print('filtered_df_small shape: ',filtered_df_small.shape)

# complete dataset
user_means_train = filtered_df_train.groupby('user_id')['listens'].mean()
song_means_train = filtered_df_train.groupby('recording_msid')['listens'].mean()

filtered_df_train = filtered_df_train.merge(user_means_small, on='user_id', suffixes=('', '_user_mean'))
filtered_df_train = filtered_df_train.merge(song_means_small, on='recording_msid', suffixes=('', '_song_mean'))
filtered_df_train['listens_centered'] = filtered_df_train['listens'] - filtered_df_train['listens_user_mean'] - filtered_df_train['listens_song_mean']
filtered_df_train = filtered_df_train.drop(['listens_user_mean', 'listens_song_mean'], axis=1)
filtered_df_train = filtered_df_train.rename(columns={'listens_centered':'listens_mean'})
filtered_df_train = filtered_df_train[filtered_df_train['listens_mean'] >= 0]
filtered_df_train['listens_mean'] = filtered_df_train['listens_mean'].round().astype(int)
print('filtered_df_train shape: ',filtered_df_train.shape)

filtered_df_small shape:  (52257, 4)
filtered_df_train shape:  (454094, 4)


In [17]:
df1 = pd.Series(filtered_df_small['recording_msid'].unique())
df2 = pd.Series(filtered_df_train['recording_msid'].unique())
df3 = pd.Series(listen_counts_test['recording_msid'].unique())

msid_series = pd.unique(pd.concat([df1, df2, df3]))

In [18]:
'''assign cat codes to recording_msid'''
# Convert union_unique_values to a Pandas Series
union_series = pd.Series(msid_series)

# Assign categorical codes using pd.Categorical()
codes = pd.Categorical(union_series).codes

# Convert the codes array to a Pandas DataFrame
msid = pd.DataFrame({'recording_msid': union_series, 'codes': codes})

In [19]:
filtered_df_small = pd.merge(filtered_df_small, msid, on='recording_msid', how='left')
filtered_df_train = pd.merge(filtered_df_train, msid, on='recording_msid', how='left')
listen_counts_test = pd.merge(listen_counts_test, msid, on='recording_msid', how='left')

In [20]:
print ('filtered_df_small shape: ', filtered_df_small.shape)
filtered_df_small.head(10)

filtered_df_small shape:  (52257, 5)


Unnamed: 0,user_id,recording_msid,listens,listens_mean,codes
0,1166,02aa34a9-28bd-4abe-a3c8-b1a4cd5dd1cf,21,1,105873
1,3159,03b0037a-33eb-4b00-b680-a43445df310c,38,16,146558
2,3159,03cfae38-14e3-47ff-ae24-3982660a6758,42,21,151508
3,16332,0852c051-2b58-472f-9324-95b5c953e61d,24,4,330240
4,1015,09a3a128-b467-4f6c-be8b-72e6c33364f2,152,105,382772
5,22157,09a3a128-b467-4f6c-be8b-72e6c33364f2,36,2,382772
6,2881,0a22d1e6-3706-406d-80a9-7e1ea902793b,37,15,402575
7,1015,0b6c2d5c-47a9-4972-ab11-804db96e6f63,129,91,453577
8,3816,133712d7-8ce3-422b-9429-831a367fc570,29,5,763906
9,20420,133712d7-8ce3-422b-9429-831a367fc570,20,0,763906


In [21]:
print ('filtered_df_train shape: ', filtered_df_train.shape)
filtered_df_train.head(10)

filtered_df_train shape:  (454094, 5)


Unnamed: 0,user_id,recording_msid,listens,listens_mean,codes
0,1,002bd589-8913-4b48-a163-162f03990aae,28,15,6863
1,1,00e1ef0e-e95e-4ba8-af2d-7423c5372c02,57,43,35362
2,1166,013d582c-d436-4d5d-a0b7-15fa8b1bfcf1,45,25,49485
3,1410,013d582c-d436-4d5d-a0b7-15fa8b1bfcf1,65,43,49485
4,3107,013d582c-d436-4d5d-a0b7-15fa8b1bfcf1,25,4,49485
5,4490,013d582c-d436-4d5d-a0b7-15fa8b1bfcf1,65,43,49485
6,1,01905d83-c169-42fc-bb93-d4e8431e9b43,21,6,62333
7,408,02083c1f-88cd-40f5-ba60-b364e4ede644,23,7,80735
8,734,02083c1f-88cd-40f5-ba60-b364e4ede644,39,23,80735
9,1166,02083c1f-88cd-40f5-ba60-b364e4ede644,41,23,80735


In [22]:
print ('listen_counts_test shape: ', listen_counts_test.shape)
listen_counts_test.head(10)

listen_counts_test shape:  (21037310, 4)


Unnamed: 0,user_id,recording_msid,listens,codes
0,1,0003ce41-37c8-4e05-9ffe-9bff615406db,4,590
1,1,0005a3ca-84b8-4420-97a1-12dc9cdcf066,1,888
2,1,000a7629-fdb7-4e5d-bf7d-00f44d373f3f,1,1596
3,1,00200571-21b0-4d8c-9eb4-bb839dbc8bca,6,4979
4,1,00212384-c7d2-424f-a0db-5f1ddc1c3028,2,5165
5,1,00348870-5de1-4326-be81-18fdc3dd43eb,45,8192
6,1,00360680-df59-453c-9d5a-8c8142a22f2f,1,8425
7,1,0038bbaa-0092-4001-89f2-be7111d38d94,1,8863
8,1,00420ab6-ff80-4a0f-83d8-fa45cd5dd44d,3,10335
9,1,0044788b-8a59-4d7a-9066-16e4ea7a2bb1,1,10723


In [23]:
'''split filtered_df into train and test sets'''

# small datadset
train_df_small, val_df_small = train_test_split(filtered_df_small, test_size=0.3, random_state=42)
val1_df_small, val2_df_small = train_test_split(val_df_small, test_size=0.4, random_state=42)
train_df_small = pd.concat([train_df_small, val1_df_small])

'''print shape of train and test sets'''
print(f"Small Train set shape: {train_df_small.shape}")
print(f"Samll Validation set shape: {val_df_small.shape}")

# complete dataset
train_df_train, val_df_train = train_test_split(filtered_df_train, test_size=0.3, random_state=42)
val1_df_train, val2_df_train = train_test_split(val_df_train, test_size=0.4, random_state=42)
train_df_train = pd.concat([train_df_train, val1_df_train])

'''print shape of train and test sets'''
print(f"Train set shape: {train_df_train.shape}")
print(f"Validation set shape: {val_df_train.shape}")

Small Train set shape: (45985, 5)
Samll Validation set shape: (15678, 5)
Train set shape: (399602, 5)
Validation set shape: (136229, 5)


In [24]:
'''Get the common user_ids'''

# small dataset
train_users_small = set(train_df_small['user_id'].unique())
val_users_small = set(val_df_small['user_id'].unique())
common_users_small = train_users_small.intersection(val_users_small)
print('users in small dataset: ', len(train_users_small))
print('common users in small train and validation dataframe: ',len(common_users_small))

# complete dataset
train_users_train = set(train_df_train['user_id'].unique())
val_users_train = set(val_df_train['user_id'].unique())
common_users_train = train_users_train.intersection(val_users_train)
print('users in complete dataset: ', len(train_users_train))
print('common users in train and validation dataframe: ',len(common_users_train))

# test dataset
test_users_train = set(listen_counts_test['user_id'].unique())
common_users_test = train_users_train.intersection(test_users_train)
print('common users in train and test dataframe: ',len(common_users_test))

users in small dataset:  2862
common users in small train and validation dataframe:  2209
users in complete dataset:  4483
common users in train and validation dataframe:  4071
common users in train and test dataframe:  3790


## 4. Save processed dataset

In [25]:
'''save the relevent dataframes'''
train_df_small.to_parquet('/home/vr2229/final-project-group-29/final_project/final-project-group-29/train_df_small.parquet', compression='snappy')
val_df_small.to_parquet('/home/vr2229/final-project-group-29/final_project/final-project-group-29/val_df_small.parquet', compression='snappy')

train_df_train.to_parquet('/home/vr2229/final-project-group-29/final_project/final-project-group-29/train_df_train.parquet', compression='snappy')
val_df_train.to_parquet('/home/vr2229/final-project-group-29/final_project/final-project-group-29/val_df_train.parquet', compression='snappy')

listen_counts_test.to_parquet('/home/vr2229/final-project-group-29/final_project/final-project-group-29/test.parquet', compression='snappy')

## 5. Alternating Least Square

In [None]:
'''alternating least square'''
train_df = spark.read.parquet("/home/vr2229/final-project-group-29/final_project/final-project-group-29/train_df_train.parquet")
val_df = spark.read.parquet("/home/vr2229/final-project-group-29/final_project/final-project-group-29/test.parquet")

# train_df = spark.read.parquet('/home/vr2229/final-project-group-29/final_project/final-project-group-29/train_df_small.parquet')
# val_df = spark.read.parquet('/home/vr2229/final-project-group-29/final_project/final-project-group-29/val_df_small.parquet')
from pyspark.sql.functions import asc
training_df = train_df.sort(asc("user_id"))
validation_df = val_df.sort(asc("user_id"))

ranks = [20]
regParam = [0.1]
errors = []
min_error = float('inf')
training_df = training_df.repartition(20000)
print('repartition done')

for param in regParam:
    for rank in ranks:
        
        tempALS = ALS(maxIter=20, rank=rank, regParam=param,
              userCol='user_id', itemCol='codes', ratingCol='listens_mean', implicitPrefs = True, numUserBlocks = 10000,
              coldStartStrategy='drop', seed=1234)
        print('als done')
        training_df = training_df.coalesce(20000)

        model = tempALS.fit(training_df)
        print('fitting done')
        
        predictions = model.transform(validation_df)
        evaluator = RegressionEvaluator(metricName='rmse', labelCol='listens', 
                                predictionCol='prediction')
        print('evaluation done')
        rmse = evaluator.evaluate(predictions)
        print('rmse calculated')
        errors.append(rmse)
        print('errors appended')
        print ('For rank {0} and regParam {1} the RMSE is {2}'.format (rank, param, rmse))
        
        if rmse < min_error:
            min_error = rmse
            best_rank = rank
            best_regParam = param
            best_model = model
        
print ('The best model was trained with rank {0} and regParam {1}'.format(best_rank, best_regParam))

In [None]:
'''save als model parameters for fastSearch'''
user_factors = model.userFactors
item_factors = model.itemFactors

user_factors.write.parquet('/home/vr2229/final-project-group-29/final_project/final-project-group-29/userFactors.parquet')
item_factors.write.parquet('/home/vr2229/final-project-group-29/final_project/final-project-group-29/item_factors.parquet')