### **Recommender System using PySpark**

Use the ALS Algorithm (Alternating Least Square) to create a Music Recommender system to suggest new tracks to different users based upon the songs they've been listening to. 

# **Install libraries, packages and datasets.**

In [1]:
# Install PySpark

!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 48.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=5c2a85337a1f896824c3646a6685ad9ea2d9f6d1d843d93deaf6f491e1bc7ec4
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, lit, desc, max
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder


Initiate the Spark session.

In [3]:
spark=SparkSession.builder.appName("lastfm").getOrCreate()

Load the listening dataset.

In [4]:
df_listenings=spark.read.csv('/content/drive/MyDrive/Colab Notebooks/Spark/Music Recommender System with PySpark/dataset/listenings.csv', inferSchema=True, header=True)

In [5]:
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      

Remove high cardinality column with no relevance (date).

In [6]:
df_listenings=df_listenings.drop('date')
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

Drop rows with null values.

In [6]:
df_listenings=df_listenings.na.drop()
df_listenings.show()

Check shape of dataframe after modifications.

In [8]:
row_nb=df_listenings.count()
column_nb=len(df_listenings.columns)

print(row_nb, column_nb)

14650594 4


See number of times each user has listenend to a particular track.

In [10]:
df_listenings_agg=df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|               Leloo|    1|
| --Seph|         The Embrace|    1|
| --Seph|          Paris 2004|    7|
| --Seph|Chelsea Hotel - L...|    1|
| --Seph|               Julia|    1|
| --Seph|In the Nothing of...|    2|
| --Seph|          I Miss You|    1|
| --Seph| The Riders of Rohan|    1|
| --Seph|Sunset Soon Forgo...|    1|
| --Seph|   Barbados Carnival|    1|
| --Seph|      Fragile Meadow|    1|
| --Seph|          Stupid Kid|    1|
| --Seph|Every Direction I...|    2|
| --Seph|         If It Works|    1|
| --Seph|           So Lonely|    2|
| --Seph|    Kiss with a Fist|    1|
| --Seph|             Starman|    2|
| --Seph|         Left Behind|    2|
| --Seph|   Duel of the Fates|    1|
| --Seph|              Campus|    1|
+-------+--------------------+-----+
only showing top 20 rows



Since it could take a while to load, we will create a smaller dataset to run faster.

In [12]:
df_listenings_agg=df_listenings_agg.limit(40000)

Convert user id and track columns into unique integers.

In [13]:
indexer=[StringIndexer(inputCol=col, outputCol=col+'_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns)-set(['count']))]

pipeline=Pipeline(stages=indexer)
data=pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

+-------+--------------------+-----+-----------+-------------+
|user_id|               track|count|track_index|user_id_index|
+-------+--------------------+-----+-----------+-------------+
| --Seph|               Leloo|    1|    17498.0|         97.0|
| --Seph|Virus (Luke Fair ...|    1|    29442.0|         97.0|
| --Seph|Airplanes [feat H...|    1|     1774.0|         97.0|
| --Seph|Belina (Original ...|    1|     7311.0|         97.0|
| --Seph|              Monday|    1|     1247.0|         97.0|
| --Seph|Hungarian Dance No 5|    1|    14845.0|         97.0|
| --Seph|       Life On Mars?|    1|     1188.0|         97.0|
| --Seph|  California Waiting|    1|      855.0|         97.0|
| --Seph|       Phantom Pt II|    1|     3651.0|         97.0|
| --Seph|   Summa for Strings|    1|    25748.0|         97.0|
| --Seph|      Hour for magic|    2|    14733.0|         97.0|
| --Seph|Hungarian Rhapsod...|    1|    14846.0|         97.0|
| --Seph|    20 Years of Snow|    1|     1691.0|       

In [14]:
data=data.select('user_id_index', 'track_index', 'count').orderBy('user_id_index')

In [15]:
data.show()

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|    31154.0|    1|
|          0.0|    23059.0|    1|
|          0.0|     3583.0|    1|
|          0.0|      609.0|    1|
|          0.0|    16888.0|    1|
|          0.0|    13660.0|    1|
|          0.0|    26304.0|    1|
|          0.0|    28787.0|    1|
|          0.0|    22705.0|    2|
|          0.0|    28191.0|    1|
|          0.0|    16198.0|    1|
|          0.0|    25713.0|    1|
|          0.0|    20323.0|    1|
|          0.0|    10442.0|    1|
|          0.0|    16197.0|    2|
|          0.0|    15350.0|    2|
|          0.0|     8794.0|    1|
|          0.0|    19800.0|    1|
|          0.0|    31054.0|    1|
|          0.0|     5517.0|    1|
+-------------+-----------+-----+
only showing top 20 rows



# **Train and test data**

In [21]:
(training, test)=data.randomSplit([0.7,0.3])

Create the model

In [22]:
USERID='user_id_index'
TRACK='track_index'
COUNT='count'

In [23]:
als=ALS(maxIter=5, regParam=0.01, userCol=USERID, itemCol=TRACK, ratingCol=COUNT)

model=als.fit(training)

predictions=model.transform(test)

# **Generate top 10 track recommendation for each user.**

In [25]:
recs=model.recommendForAllUsers(10)

In [26]:
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|            0|[{3984, 27.154272...|
|            1|[{15838, 10.71552...|
|            2|[{9428, 13.435165...|
|            3|[{15838, 12.97624...|
|            4|[{9428, 19.973269...|
|            5|[{18283, 14.09527...|
|            6|[{3984, 31.650919...|
|            7|[{9428, 22.883905...|
|            8|[{9428, 16.405933...|
|            9|[{26726, 11.30700...|
|           10|[{445, 13.6247225...|
|           11|[{3984, 38.153416...|
|           12|[{342, 11.424215}...|
|           13|[{5999, 8.6203575...|
|           14|[{3984, 64.70802}...|
|           15|[{1603, 10.505446...|
|           16|[{9428, 25.97021}...|
|           17|[{18283, 11.30813...|
|           18|[{9428, 17.126461...|
|           19|[{3984, 61.168205...|
+-------------+--------------------+
only showing top 20 rows



In [27]:
recs.take(1)

[Row(user_id_index=0, recommendations=[Row(track_index=3984, rating=27.154272079467773), Row(track_index=445, rating=8.843757629394531), Row(track_index=520, rating=8.322396278381348), Row(track_index=31645, rating=6.964195251464844), Row(track_index=1083, rating=6.901890277862549), Row(track_index=331, rating=6.800934314727783), Row(track_index=23936, rating=6.64244270324707), Row(track_index=569, rating=6.3976149559021), Row(track_index=77, rating=5.649947166442871), Row(track_index=9428, rating=5.631516933441162)])]