# **MUSIC RECOMMENDER SYSTEM USING PYSPARK** 

Installing pyspark

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 59.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=414af3ded2b2b222ef4fac018735a82268aa8803c89bdb394520a83e7b95e739
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


Importing the modules

In [41]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [42]:
spark = SparkSession.builder.appName('rec').getOrCreate()

# Loading the dataset

In [7]:
filepath = '/content/drive/MyDrive/Colab Notebooks/Music Recommender System Using PySpark Dataset/listenings.csv'
df_listenings = spark.read.format('csv').option('header', True).option('InferSchema', True).load(filepath)
# When True, Spark will automatically infer the schema of each column


In [8]:
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Data cleaning 

In [9]:
df_listenings = df_listenings.drop('date')
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [10]:
df_listenings = df_listenings.na.drop()

In [11]:
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [12]:
rows = df_listenings.count()
cols = len(df_listenings.columns)
print ("Rows: ",rows, " and Columns: ", cols)

Rows:  13758905  and Columns:  4



# Performing some aggregation
to see how many times each user has listened to specific track


In [14]:
df_listenings_agg = df_listenings.select('user_id','track').groupby('user_id','track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|               Leloo|    1|
| --Seph|         The Embrace|    1|
| --Seph|          Paris 2004|    7|
| --Seph|Chelsea Hotel - L...|    1|
| --Seph|               Julia|    1|
| --Seph|In the Nothing of...|    2|
| --Seph|          I Miss You|    1|
| --Seph| The Riders of Rohan|    1|
| --Seph|Sunset Soon Forgo...|    1|
| --Seph|   Barbados Carnival|    1|
| --Seph|      Fragile Meadow|    1|
| --Seph|          Stupid Kid|    1|
| --Seph|Every Direction I...|    2|
| --Seph|         If It Works|    1|
| --Seph|           So Lonely|    2|
| --Seph|    Kiss with a Fist|    1|
| --Seph|             Starman|    2|
| --Seph|         Left Behind|    2|
| --Seph|   Duel of the Fates|    1|
| --Seph|       Pressure Drop|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [15]:
rows = df_listenings_agg.count()
cols = len(df_listenings_agg.columns)
print ("Rows: ",rows, " and Columns: ", cols)

Rows:  9930128  and Columns:  3


In [16]:
df_listenings_agg = df_listenings_agg.limit(20000)

# Converting the user id and track columns into unique integers




In [18]:
indexer = [StringIndexer(inputCol=col,outputCol=col+"_Index").fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns)-set('count'))]

In [19]:
pipeline = Pipeline(stages=indexer)

In [20]:
data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)

In [21]:
data.show()

+-------+--------------------+-----+-----------+-------------+-----------+
|user_id|               track|count|track_Index|user_id_Index|count_Index|
+-------+--------------------+-----+-----------+-------------+-----------+
| --Seph|          Nightmares|    1|    10600.0|         69.0|        0.0|
| --Seph|Virus (Luke Fair ...|    1|    15893.0|         69.0|        0.0|
| --Seph|Airplanes [feat H...|    1|      521.0|         69.0|        0.0|
| --Seph|Belina (Original ...|    1|     3280.0|         69.0|        0.0|
| --Seph|              Monday|    1|      334.0|         69.0|        0.0|
| --Seph|Hungarian Dance No 5|    1|     7555.0|         69.0|        0.0|
| --Seph|       Life On Mars?|    1|     1164.0|         69.0|        0.0|
| --Seph|  California Waiting|    1|      195.0|         69.0|        0.0|
| --Seph|       Phantom Pt II|    1|     1378.0|         69.0|        0.0|
| --Seph|   Summa for Strings|    1|    13737.0|         69.0|        0.0|
| --Seph|      Hour for m

# Train and Test data

In [51]:
training, testing = data.limit(100).randomSplit([0.7, 0.3])


In [52]:
training.show(5)

+-------+------------+-----+-----------+-------------+-----------+
|user_id|       track|count|track_Index|user_id_Index|count_Index|
+-------+------------+-----+-----------+-------------+-----------+
| --Seph|        0040|    1|     1925.0|         69.0|        0.0|
| --Seph|   2 and 2=5|    1|     2024.0|         69.0|        0.0|
| --Seph|     401 Lwa|    2|     2077.0|         69.0|        1.0|
| --Seph|5 Years Time|    1|     2091.0|         69.0|        0.0|
| --Seph| Agoraphobia|    1|     2427.0|         69.0|        0.0|
+-------+------------+-----+-----------+-------------+-----------+
only showing top 5 rows



# Creating the Model

In [53]:
USERID = 'user_id_Index'
TRACK = 'track_Index'
COUNT = 'count'

als= ALS(maxIter=5, regParam=0.01, userCol=USERID, itemCol=TRACK, ratingCol=COUNT)



In [47]:
# spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [54]:
model= als.fit(training)

In [55]:
predictions = model.transform(testing)


# Generating the top 10 track recommendations for each user

In [56]:
recs = model.recommendForAllUsers(10)

In [57]:
recs.show()

+-------------+--------------------+
|user_id_Index|     recommendations|
+-------------+--------------------+
|           69|[{11311, 6.970824...|
+-------------+--------------------+



In [58]:
recs.take(1)


[Row(user_id_Index=69, recommendations=[Row(track_Index=11311, rating=6.970823764801025), Row(track_Index=10481, rating=2.9874961376190186), Row(track_Index=13228, rating=2.9874961376190186), Row(track_Index=2077, rating=1.991663932800293), Row(track_Index=5790, rating=1.991663932800293), Row(track_Index=879, rating=1.991663932800293), Row(track_Index=7492, rating=1.991663932800293), Row(track_Index=14689, rating=1.991663932800293), Row(track_Index=2431, rating=0.9958319664001465), Row(track_Index=10600, rating=0.9958319664001465)])]