In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=7f5bf3737599f06b6d537ca72cc7f06e4c3f0c211f3290bfb0b627671c1de37a
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

# Creating the spark session


In [3]:
spark = SparkSession.builder.appName("favmusic").getOrCreate()

# Loading the dataset

In [5]:
file_path ="/content/listenings.csv"
df_listenings = spark.read.csv(file_path, header = True, inferSchema=True)

In [6]:
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Data Cleaning

In [8]:
# delete date column
df_listenings = df_listenings.drop('date')
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [9]:
#delete nan values
df_listenings = df_listenings.na.drop()
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [10]:
rows = df_listenings.count()
cols = len(df_listenings.columns)
print (rows, cols)

54315 4



# Data aggregation
to see how many times each user has listened to specific track


In [11]:
df_listenings_agg = df_listenings.select('user_id','track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show()

+-----------+--------------------+-----+
|    user_id|               track|count|
+-----------+--------------------+-----+
|000Silenced|  Could This Be Real|    1|
|000Silenced|Remember (ESCM 12...|    1|
|000Silenced| The Nephilim Rising|    1|
|000Silenced|    Away In A Manger|    1|
|000Silenced|     Rock This Place|    1|
|000Silenced|            Red Heat|    1|
|000Silenced|         Conquer All|    1|
|000Silenced|We Love Bass (Dub...|    1|
|000Silenced|            Transway|    1|
|000Silenced|               Rhino|    1|
|000Silenced|      Caramelldansen|    2|
|000Silenced|Don't Stop The Mu...|    1|
|000Silenced|         Machine Gun|    1|
|000Silenced|Corrected (Freaky...|    1|
|000Silenced|   I'm Not Invisible|    1|
|000Silenced|    Just So You Know|    1|
|000Silenced|Soul Purge (Featu...|    1|
|000Silenced|Run To You (BT vs...|    1|
|000Silenced|Anomaly (Calling ...|    1|
|000Silenced|    Won't Go Quietly|    1|
+-----------+--------------------+-----+
only showing top

# Convert user_id and track columns into unique integers




In [12]:
indexer = [StringIndexer(inputCol=col ,outputCol= col + '_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns) - set(['count']))]

pipeline = Pipeline(stages=indexer)

data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

+-----------+--------------------+-----+-----------+-------------+
|    user_id|               track|count|track_index|user_id_index|
+-----------+--------------------+-----+-----------+-------------+
|000Silenced|      Post(?)organic|    1|    21874.0|         13.0|
|000Silenced|    Away In A Manger|    1|     6832.0|         13.0|
|000Silenced|Soul Purge (Featu...|    1|    24813.0|         13.0|
|000Silenced|            Red Heat|    1|    22517.0|         13.0|
|000Silenced|            Headknot|    1|     2811.0|         13.0|
|000Silenced|We Love Bass (Dub...|    1|    29537.0|         13.0|
|000Silenced|Anomaly (Calling ...|    1|     6426.0|         13.0|
|000Silenced|               Rhino|    1|    22701.0|         13.0|
|000Silenced| Let the Story Begin|    1|    17550.0|         13.0|
|000Silenced|Don't Stop The Mu...|    1|    10855.0|         13.0|
|000Silenced|   Can't Tek No More|    1|     2152.0|         13.0|
|000Silenced|               Money|    1|      282.0|         1

In [13]:
data = data.select('user_id_index','track_index', 'count').orderBy('user_id_index')
data.show()

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|    10473.0|    1|
|          0.0|       22.0|    3|
|          0.0|    16200.0|    2|
|          0.0|    15360.0|    2|
|          0.0|     8863.0|    1|
|          0.0|    19656.0|    1|
|          0.0|    30715.0|    1|
|          0.0|     5586.0|    1|
|          0.0|    27285.0|    1|
|          0.0|    22831.0|    1|
|          0.0|    21565.0|    1|
|          0.0|    17074.0|    1|
|          0.0|    25706.0|    1|
|          0.0|       23.0|    1|
|          0.0|    29046.0|    1|
|          0.0|    26811.0|    1|
|          0.0|     9406.0|    1|
|          0.0|    26268.0|    3|
|          0.0|      538.0|    1|
|          0.0|    25781.0|    1|
+-------------+-----------+-----+
only showing top 20 rows



# Split data into Train and Test dataset

In [14]:
(train, test) = data.randomSplit([0.5,0.5])

# Creating Model

In [15]:
als = ALS(maxIter=5 , regParam= 0.01 , userCol='user_id_index' , itemCol= 'track_index' , ratingCol = 'count')
model = als.fit(train)

prediction = model.transform(test)


# Generate top 10 Track recommendations for each user

In [16]:
recs = model.recommendForAllUsers(10)
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|            0|[{26215, 12.27636...|
|            1|[{818, 9.418161},...|
|            2|[{26455, 9.837048...|
|            3|[{26215, 13.05875...|
|            4|[{748, 11.068879}...|
|            5|[{5678, 11.184947...|
|            6|[{5678, 24.193129...|
|            7|[{5678, 10.070045...|
|            8|[{5678, 8.953505}...|
|            9|[{26215, 12.26219...|
|           10|[{536, 5.939341},...|
|           11|[{9484, 14.072751...|
|           12|[{4096, 7.7516155...|
|           13|[{9484, 23.538061...|
|           14|[{5678, 28.245367...|
|           15|[{26455, 7.383555...|
|           16|[{9484, 15.812599...|
|           17|[{4399, 8.770046}...|
|           18|[{11534, 10.71259...|
|           19|[{2512, 14.102952...|
+-------------+--------------------+
only showing top 20 rows



In [17]:
recs.take(1)

[Row(user_id_index=0, recommendations=[Row(track_index=26215, rating=12.276359558105469), Row(track_index=748, rating=9.136616706848145), Row(track_index=26455, rating=8.908474922180176), Row(track_index=4096, rating=6.637484073638916), Row(track_index=26581, rating=5.157538414001465), Row(track_index=25199, rating=4.990167617797852), Row(track_index=1483, rating=4.953574180603027), Row(track_index=219, rating=4.900288105010986), Row(track_index=4683, rating=4.586238861083984), Row(track_index=14091, rating=4.168476104736328)])]