<a href="https://colab.research.google.com/github/ciciwu/recsys_playground/blob/main/pyspark/Music_Recs_ALS_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **WELCOME TO THIS NOTEBOOK**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Let's install pyspark

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=9db7a3dabab9a07b6909550dd12f4c93d9ffa297de16e58b82cb9e4f35e9baf7
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


Importing the modules

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [4]:
spark = SparkSession.builder.appName("lastfm").getOrCreate()

# Loading the dataset

In [5]:
file_path = '/content/drive/MyDrive/content/music_rec/listenings.csv'
df_listenings = spark.read.format('csv').option('header',True).option('inferSchema',True).load(file_path) #data frame, header will infer column types from csv
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Cleaning tables

In [6]:
df_listenings = df_listenings.drop('date')#drop date and time col from data frame
df_listenings.show() #has information about each user, which track, artist and album they listened to

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [7]:
df_listenings = df_listenings.na.drop() # removes null values in the row
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [8]:
rows = df_listenings.count()
cols = len(df_listenings.columns)
print(rows,cols)

13758905 4



# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [9]:
#in order to make a model, we need to know how many times a user has listened to each song
df_listenings_agg = df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph| White Winter Hymnal|    3|
| --Seph|         The Funeral|    1|
| --Seph|Hope There's Someone|    1|
| --Seph|         The Painter|    1|
| --Seph|          Je te veux|    1|
| --Seph|            War Pigs|    1|
| --Seph|                 F12|    1|
| --Seph|                Team|    1|
| --Seph|          Nightmares|    1|
| --Seph|               Radio|    1|
| --Seph|   All I Want Is You|    1|
| --Seph|    Little by Little|    2|
| --Seph|        After Nature|    1|
| --Seph|In the Hall of th...|    1|
| --Seph|   Hey There Delilah|    1|
| --Seph|   Let's Call It Off|    1|
| --Seph|               Leloo|    1|
| --Seph|             Pack Up|    1|
| --Seph|           Introitus|    1|
| --Seph|        The Leanover|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [10]:
row = df_listenings_agg.count()
col = len(df_listenings_agg.columns)
print(row,col)

9930128 3


In [11]:
#we want to decrement our row size, its almost 10 million, lets reduce it inorder to process our data much faster
df_listenings_agg = df_listenings_agg.limit(20000)


# Let's convert the user id and track columns into unique integers




In [12]:
#We want to use StringIndexer to convert userID and track to unique, integer values
#StringIndexer encodes a string column of labels to a column of label indices
old_strindexer = [StringIndexer(inputCol = col, outputCol = col + '_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns)- set(['count']))]
indexer = [curr_strindexer.setHandleInvalid("keep") for curr_strindexer in old_strindexer]
pipeline = Pipeline(stages = indexer)
#the dataframe will be called data
#Transformers convert one dataframe into another either by updating the current values of a particular column (like converting categorical columns to numeric) or mapping it to some other values by using a defined logic.
#An Estimator implements the fit() method on a dataframe and produces a model.
data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

+-------+--------------------+-----+-----------+-------------+
|user_id|               track|count|track_index|user_id_index|
+-------+--------------------+-----+-----------+-------------+
| --Seph|          Nightmares|    1|    10600.0|         69.0|
| --Seph|Virus (Luke Fair ...|    1|    15893.0|         69.0|
| --Seph|Airplanes [feat H...|    1|      521.0|         69.0|
| --Seph|Belina (Original ...|    1|     3280.0|         69.0|
| --Seph|              Monday|    1|      334.0|         69.0|
| --Seph|Hungarian Dance No 5|    1|     7555.0|         69.0|
| --Seph|       Life On Mars?|    1|     1164.0|         69.0|
| --Seph|  California Waiting|    1|      195.0|         69.0|
| --Seph|       Phantom Pt II|    1|     1378.0|         69.0|
| --Seph|   Summa for Strings|    1|    13737.0|         69.0|
| --Seph|      Hour for magic|    2|     7492.0|         69.0|
| --Seph|Hungarian Rhapsod...|    1|     7556.0|         69.0|
| --Seph|     The Way We Were|    1|    14958.0|       

In [13]:
data = data.select('user_id_index', 'track_index', 'count').orderBy('user_id_index')



In [None]:
data.show()

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|    10628.0|    1|
|          0.0|     3338.0|    1|
|          0.0|    12168.0|    1|
|          0.0|    11626.0|    2|
|          0.0|    10094.0|    4|
|          0.0|      427.0|    1|
|          0.0|    16878.0|    1|
|          0.0|    11722.0|    1|
|          0.0|    15074.0|    1|
|          0.0|     1359.0|    1|
|          0.0|     5874.0|    1|
|          0.0|    11184.0|    1|
|          0.0|     2372.0|    2|
|          0.0|    14316.0|    1|
|          0.0|     5346.0|    1|
|          0.0|    11194.0|    1|
|          0.0|     2241.0|    1|
|          0.0|     2864.0|    1|
|          0.0|     2663.0|    4|
|          0.0|     6064.0|    1|
+-------------+-----------+-----+
only showing top 20 rows



# Train and Test data

In [14]:
(training, test) = data.randomSplit([0.5,0.5])

# Let's Create our Model

In [15]:
USERID = "user_id_index"
TRACK = "track_index"
COUNT = "count"

als = ALS(maxIter = 5, regParam = 0.01, userCol = USERID, itemCol = TRACK, ratingCol = COUNT)
# Alternating Least Squares algorithm
print(als)
model = als.fit(training)

predictions = model.transform(test)

ALS_f6975171dcec


In [17]:
predictions.show(10)

+-------------+-----------+-----+-------------+
|user_id_index|track_index|count|   prediction|
+-------------+-----------+-----+-------------+
|          0.0|        1.0|    1|    1.9426332|
|          0.0|       34.0|    3|    2.8877683|
|          0.0|       64.0|    1|   -1.0375698|
|          0.0|       72.0|    1|  -0.23220444|
|          0.0|       90.0|    1|-0.0019499063|
|          0.0|      114.0|    1|          NaN|
|          0.0|      157.0|    1| -0.008078817|
|          0.0|      352.0|    1|          NaN|
|          0.0|      382.0|    1|   0.07665002|
|          0.0|      385.0|    1|          NaN|
+-------------+-----------+-----+-------------+
only showing top 10 rows



In [27]:
predictions=predictions.na.drop()

In [28]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="count",
predictionCol="prediction")

# Evaluate the "test_predictions" dataframe
RMSE = evaluator.evaluate(predictions)

# Print the RMSE
print (RMSE)

2.6313080814199052



# Generate top 10 Track recommendations for each user

In [None]:
recs = model.recommendForAllUsers(10)

In [None]:
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|            0|[{4460, 17.55068}...|
|            1|[{4460, 14.504824...|
|            2|[{1808, 7.1139107...|
|            3|[{3345, 9.5855255...|
|            4|[{7847, 5.9461546...|
|            5|[{3345, 8.19309},...|
|            6|[{11940, 35.23830...|
|            7|[{4460, 14.913336...|
|            8|[{461, 11.323953}...|
|            9|[{9498, 8.91315},...|
|           10|[{84, 15.409278},...|
|           11|[{7847, 11.740087...|
|           12|[{11940, 19.40975...|
|           13|[{7847, 7.0682983...|
|           14|[{4460, 30.73787}...|
|           15|[{4460, 25.294}, ...|
|           16|[{11940, 18.11632...|
|           17|[{16968, 9.095592...|
|           18|[{11940, 9.351403...|
|           19|[{11940, 43.04874...|
+-------------+--------------------+
only showing top 20 rows



In [None]:
recs.take(1)

[Row(user_id_index=0, recommendations=[Row(track_index=4460, rating=17.55068016052246), Row(track_index=1694, rating=14.235849380493164), Row(track_index=120, rating=11.060724258422852), Row(track_index=11940, rating=9.11754322052002), Row(track_index=180, rating=8.364152908325195), Row(track_index=4461, rating=8.100314140319824), Row(track_index=1439, rating=7.11166524887085), Row(track_index=10815, rating=6.750261306762695), Row(track_index=524, rating=6.750261306762695), Row(track_index=14378, rating=5.52789306640625)])]