In [None]:
!pip install pyspark

#Import Modules

In [None]:
import numpy
import pandas as pd
import glob
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sb
import plotly.express as px

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
sc = SparkContext('local')
spark = SparkSession(sc)

#Loading the Dataset

In [None]:
file_song_data = "/content/drive/MyDrive/Dataset/msdsummary.csv"
file_triplet_data = "/content/drive/MyDrive/Dataset/kaggle_triplets.csv"

In [None]:
song_df = spark.read.csv(file_song_data, inferSchema=True, header=True, sep=',')
song_df.printSchema()

In [None]:
triplet_df = spark.read.csv('/content/drive/MyDrive/Dataset/kaggle_triplets.csv', inferSchema=True, header=True, sep=',')
triplet_df.printSchema()

#Pre-processing

In [None]:
MSD = triplet_df.join(song_df, triplet_df.song_id == song_df.song_id, how='left').drop(song_df.song_id)
MSD.show(10)

In [None]:
MSD = MSD['user_id', 'song_id', 'play_count', 'title', 'release', 'artist_name', 'year']
MSD.show(5)

In [None]:
print(MSD.count())
print(MSD.select('user_id').distinct().count())
print(MSD.select('song_id').distinct().count())

In [None]:
#MSD = MSD.limit(745976) #half of total samples
#MSD = MSD.limit(372988) #1/4th of total samples
#MSD = MSD.limit(186494) #1/8th of total samples
#MSD = MSD.limit(93247) #1/16th of total samples
MSD = MSD.limit(46623) #1/32 of total samples

In [None]:
print(MSD.count())
print(MSD.select('user_id').distinct().count())
print(MSD.select('song_id').distinct().count())

#Queries for data analysis

In [None]:
MSD.createOrReplaceTempView('playlist')

In [None]:
query = "SELECT artist_name, title, sum(play_count) AS number_of_total_play FROM playlist GROUP BY title, artist_name ORDER BY sum(play_count) DESC"

mostPlayedSongs = spark.sql(query)
mostPlayedSongs.show(10)

In [None]:
mostPlayedSongs = mostPlayedSongs.toPandas()

In [None]:
mostPlayedSongs['song'] = mostPlayedSongs['title']+' - '+mostPlayedSongs['artist_name']
mostPlayedSongs['index'] = mostPlayedSongs.index

mostPlayedSongs_frames = mostPlayedSongs.iloc[:20, :]
mostPlayedSongs_frames

In [None]:
bar_graph = sb.barplot(x='index', y='number_of_total_play', data=mostPlayedSongs_frames)
bar_graph.set(xlabel='Song distribution', ylabel='Total Play Count', title='Most Played Songs (Top 20)')

In [None]:
query = "SELECT user_id, sum(play_count) AS number_of_total_play FROM playlist GROUP BY user_id ORDER BY sum(play_count)"
topListeners = spark.sql(query)
topListeners.show(10)

In [None]:
query = "SELECT play_count, count(*) AS count FROM playlist GROUP BY play_count ORDER BY play_count"
#This is supposed to be graphed but ok :|
playDistribution = spark.sql(query)
playDistribution.show()

In [None]:
playDistribution = playDistribution.toPandas()

In [None]:
playDistribution_frames = playDistribution.iloc[:20, :]
playDistribution_frames

In [None]:
fig = px.pie(playDistribution_frames, values='count', names='play_count', hole=0.3, title='Play Count Distributions for the play_counts 1-20')
fig.show()

In [None]:
#10% of users from MSD data
user = MSD.select('user_id').distinct()
user1, user2 = user.randomSplit([0.5, 0.95], seed=123)
userCount = user1.count()
print("Users: ", userCount)

In [None]:
#Distinct list of songs
songs = MSD.select('song_id').distinct()
songCount = songs.count()
print("Number of songs: ", songCount)

In [None]:
#making user ids into integers
users_df = user1.withColumn('new_userid', monotonically_increasing_id())
users_df.show()

In [None]:
#making song ids into integers
song_df = songs.select('song_id', monotonically_increasing_id().alias('new_songid'))
song_df.show()

In [None]:
#cross-join user and songs
crossJoin = users_df.crossJoin(song_df)
crossJoin.show()

In [None]:
crossJoin.count()

In [None]:
df = crossJoin.join(MSD, ['user_id', 'song_id'], 'left').fillna(0)

model_df = df.select(df.new_userid.cast('int'), df.new_songid.cast('int'),df.play_count.cast('int'))
#cut this down in size. there isnt enough storage to train so much data


(train_data, test_data) = model_df.select('new_userid', 'new_songid', 'play_count').randomSplit([0.7, 0.3], seed=12345)

#Alternating Least Squares Algorithm (ALS)

In [None]:
from pyspark.ml.recommendation import ALS

als = ALS(userCol= 'new_userid',
            itemCol= 'new_songid',
            ratingCol= 'play_count',
            rank= 10,
            maxIter= 10,
            alpha= 20,
            regParam= .05,
            coldStartStrategy= 'drop',
            nonnegative= True,
            implicitPrefs= True)

In [None]:
model = als.fit(train_data)

#Rank Ordering Error Metric

In [None]:
def ROEM(predictions, userCol='new_userid', itemCol='new_songid', ratingCol='play_count'):
  predictions.createOrReplaceTempView('predictions')

  denominator = predictions.groupBy().sum(ratingCol).collect()[0][0]

  spark.sql("SELECT " + userCol + ", " + ratingCol + ", PERCENT_RANK() OVER (PARTITION BY " + userCol + " ORDER BY prediction DESC) AS rank FROM predictions").createOrReplaceTempView('rankings')

  numerator = spark.sql("SELECT SUM(" + ratingCol + " * rank) FROM rankings").collect()[0][0]

  performance = numerator/denominator
  return performance

In [None]:
predictions = model.transform(test_data)

In [None]:
validation_performance = ROEM(predictions)
print(validation_performance)