<a href="https://colab.research.google.com/github/farastu-who/Music-Recommender-System-Using-Pyspark/blob/main/The_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **WELCOME TO THIS NOTEBOOK**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Let's install pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 64 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 36.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=6f11e6372db2fe44cf62cf524c42f0f63af097c0ce779788a30addb99ef4875b
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


Importing the modules

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [None]:
spark = SparkSession.builder.appName("lastfm").getOrCreate()

# Loading the dataset

In [None]:
file_path = '/content/drive/MyDrive/dataset/listenings.csv'
df_listenings = spark.read.format('csv').option('header', True).option('inferSchema', True).load(file_path)
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Cleaning tables 

In [None]:
df_listenings = df_listenings.drop('date')
df_listenings.show()


+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [None]:
df_listenings = df_listenings.na.drop()
df_listenings.show()


+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [None]:
row_numbers = df_listenings.count()
column_numbers = len(df_listenings.columns)
print(row_numbers,column_numbers)

13758905 4



# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [None]:
df_listenings_agg = df_listenings.select('user_id','track').groupby('user_id','track').agg(count('*')).alias('count').orderBy('user_id')
df_listenings_agg.show()


+-------+--------------------+--------+
|user_id|               track|count(1)|
+-------+--------------------+--------+
| --Seph|Chelsea Hotel - L...|       1|
| --Seph|        Window Blues|       1|
| --Seph|          Paris 2004|       7|
| --Seph|     The Way We Were|       1|
| --Seph|Vestido Estampado...|       1|
| --Seph|   Summa for Strings|       1|
| --Seph|         The Embrace|       1|
| --Seph|      Hour for magic|       2|
| --Seph|Hungarian Rhapsod...|       1|
| --Seph| Air on the G String|       1|
| --Seph|       Life On Mars?|       1|
| --Seph|Belina (Original ...|       1|
| --Seph|Hungarian Dance No 5|       1|
| --Seph|       Phantom Pt II|       1|
| --Seph|               Leloo|       1|
| --Seph|Airplanes [feat H...|       1|
| --Seph|  California Waiting|       1|
| --Seph| White Winter Hymnal|       3|
| --Seph|              Monday|       1|
| --Seph|Virus (Luke Fair ...|       1|
+-------+--------------------+--------+
only showing top 20 rows



In [None]:
row_numbers = df_listenings_agg.count()
column_numbers = len(df_listenings_agg.columns)
print(row_numbers,column_numbers)

9930128 3


In [None]:
df_listenings_agg = df_listenings_agg.limit(20000)


# Let's convert the user id and track columns into unique integers




In [None]:
indexer = [StringIndexer(inputCol=col , outputCol=col+'_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns) - set(['count(1)']))
]

pipeline = Pipeline(stages=indexer)

data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)

data.show()

+-------+--------------------+--------+-----------+-------------+
|user_id|               track|count(1)|track_index|user_id_index|
+-------+--------------------+--------+-----------+-------------+
| --Seph| White Winter Hymnal|       3|       59.0|         69.0|
| --Seph|Virus (Luke Fair ...|       1|    15896.0|         69.0|
| --Seph|Airplanes [feat H...|       1|      519.0|         69.0|
| --Seph|Belina (Original ...|       1|     3278.0|         69.0|
| --Seph|              Monday|       1|      334.0|         69.0|
| --Seph|Hungarian Dance No 5|       1|     7558.0|         69.0|
| --Seph|       Life On Mars?|       1|     1161.0|         69.0|
| --Seph|  California Waiting|       1|      197.0|         69.0|
| --Seph|       Phantom Pt II|       1|     1377.0|         69.0|
| --Seph|   Summa for Strings|       1|    13739.0|         69.0|
| --Seph|      Hour for magic|       2|     7495.0|         69.0|
| --Seph|Hungarian Rhapsod...|       1|     7559.0|         69.0|
| --Seph| 

In [None]:
data = data.select('user_id_index','track_index','count(1)').orderBy('user_id_index')

In [None]:
data.show()

+-------------+-----------+--------+
|user_id_index|track_index|count(1)|
+-------------+-----------+--------+
|          0.0|    10943.0|       1|
|          0.0|    11628.0|       2|
|          0.0|     1349.0|       1|
|          0.0|      381.0|       1|
|          0.0|     8692.0|       1|
|          0.0|     6899.0|       1|
|          0.0|    14044.0|       1|
|          0.0|    15513.0|       1|
|          0.0|    11978.0|       2|
|          0.0|    15176.0|       1|
|          0.0|     8305.0|       1|
|          0.0|    13722.0|       1|
|          0.0|    10620.0|       1|
|          0.0|     4424.0|       1|
|          0.0|    16732.0|       1|
|          0.0|    10630.0|       1|
|          0.0|    12169.0|       1|
|          0.0|     4117.0|       1|
|          0.0|    10336.0|       1|
|          0.0|    16829.0|       1|
+-------------+-----------+--------+
only showing top 20 rows



# Train and Test data

In [None]:
(training , test) = data.randomSplit([0.5,0.5])

# Let's Create our Model

In [None]:
USERID = 'user_id_index'
TRACK = 'track_index'
COUNT = 'count(1)'

als = ALS(maxIter = 5, regParam = 0.01, userCol = USERID, itemCol = TRACK, ratingCol = COUNT)
model = als.fit(training)

predictions = model.transform(test)



# Generate top 10 Track recommendations for each user

In [None]:
recs = model.recommendForAllUsers(10)

In [None]:
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|          148|[{11941, 29.54029...|
|           31|[{12192, 7.960277...|
|           85|[{11941, 34.83222...|
|          137|[{8821, 12.511218...|
|           65|[{11941, 34.42089...|
|           53|[{6839, 9.998069}...|
|          133|[{10645, 15.99729...|
|           78|[{12892, 5.606186...|
|          108|[{7826, 7.981048}...|
|           34|[{8821, 7.4485316...|
|          101|[{11941, 57.08678...|
|          115|[{8821, 15.690644...|
|          126|[{16909, 18.13120...|
|           81|[{1325, 8.127888}...|
|           28|[{0, 7.519107}, {...|
|           76|[{4619, 11.047966...|
|           26|[{11941, 38.28414...|
|           27|[{11941, 18.61196...|
|           44|[{11941, 30.00521...|
|          103|[{182, 8.634539},...|
+-------------+--------------------+
only showing top 20 rows



In [None]:
recs.take(1)

[Row(user_id_index=148, recommendations=[Row(track_index=11941, rating=29.54029083251953), Row(track_index=8821, rating=13.558658599853516), Row(track_index=0, rating=12.030959129333496), Row(track_index=309, rating=11.750467300415039), Row(track_index=10339, rating=11.506949424743652), Row(track_index=11878, rating=11.506949424743652), Row(track_index=4619, rating=11.506949424743652), Row(track_index=10645, rating=11.506949424743652), Row(track_index=227, rating=11.136035919189453), Row(track_index=3150, rating=10.068580627441406)])]