# **WELCOME TO THIS NOTEBOOK**

Let's install pyspark

In [None]:
!pip install pyspark



Importing the modules

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [None]:
#create spark session
spark = SparkSession.builder.appName('lastfm').getOrCreate()

# Loading the dataset

In [None]:
#get file path
file_path = '/content/listenings.csv'

#create dataframe
df_listenings = spark.read.format('csv').option('header', True).option('inferSchema', True).load(file_path)
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Cleaning tables 

In [None]:
#drop 'date' column
df_listenings = df_listenings.drop('date')
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [None]:
#drop null
df_listenings = df_listenings.na.drop()
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [None]:
#check shape
row_numbers = df_listenings.count()
column_numbers = len(df_listenings.columns)
print(row_numbers, column_numbers)

13865391 4



# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [None]:
df_listenings_agg = df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|Chelsea Hotel - L...|    1|
| --Seph|        Window Blues|    1|
| --Seph|          Paris 2004|    7|
| --Seph|     The Way We Were|    1|
| --Seph|Vestido Estampado...|    1|
| --Seph|               Leloo|    1|
| --Seph|         The Embrace|    1|
| --Seph|      Hour for magic|    2|
| --Seph|Hungarian Rhapsod...|    1|
| --Seph| Air on the G String|    1|
| --Seph|              Monday|    1|
| --Seph|Belina (Original ...|    1|
| --Seph|   Summa for Strings|    1|
| --Seph| White Winter Hymnal|    3|
| --Seph|       Phantom Pt II|    1|
| --Seph|  California Waiting|    1|
| --Seph|Hungarian Dance No 5|    1|
| --Seph|       Life On Mars?|    1|
| --Seph|Airplanes [feat H...|    1|
| --Seph|Virus (Luke Fair ...|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [None]:
row_numbers = df_listenings_agg.count()
column_numbers = len(df_listenings_agg.columns)
print(row_numbers, column_numbers)

# Let's convert the user id and track columns into unique integers




In [None]:
#get a smaller size dataset
df_listenings_agg = df_listenings_agg.limit(20000)

In [None]:
indexer = [StringIndexer(inputCol=col, 
                         outputCol=col+'_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns) - set(['count']))]

pipeline = Pipeline(stages=indexer)

data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

In [None]:
#select desired columns
data = data.select('user_id_index', 'track_index', 'count').orderBy('user_id_index')
data.show()

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|     2369.0|    2|
|          0.0|     4116.0|    1|
|          0.0|    14314.0|    1|
|          0.0|       34.0|    3|
|          0.0|    13211.0|    1|
|          0.0|     5348.0|    1|
|          0.0|    11192.0|    1|
|          0.0|     2238.0|    1|
|          0.0|     9397.0|    1|
|          0.0|     2862.0|    1|
|          0.0|     2661.0|    4|
|          0.0|     6067.0|    1|
|          0.0|     9996.0|    1|
|          0.0|     3336.0|    1|
|          0.0|    10623.0|    1|
|          0.0|    12169.0|    1|
|          0.0|    13536.0|    1|
|          0.0|      789.0|    1|
|          0.0|    11565.0|    1|
|          0.0|       68.0|    1|
+-------------+-----------+-----+
only showing top 20 rows



# Train and Test data

In [None]:
#training test split
(training, test) = data.randomSplit([0.5, 0.5])

# Let's Create our Model

In [56]:
#create some constants
USERID = 'user_id_index'
TRACK = 'track_index'
COUNT = 'count'

#create model
als = ALS(maxIter=5,
          regParam=0.01,
          userCol=USERID,
          itemCol=TRACK,
          ratingCol=COUNT)

#fit
model = als.fit(training)

#predict
predictions = model.transform(test)


# Generate top 10 Track recommendations for each user

In [60]:
#get recommendations
recs = model.recommendForAllUsers(10)

In [61]:
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|          148|[{1735, 10.128324...|
|           31|[{177, 9.897927},...|
|           85|[{11848, 5.9797},...|
|          137|[{15424, 11.05808...|
|           65|[{4459, 17.209393...|
|           53|[{15789, 11.95762...|
|          133|[{14822, 17.99633...|
|           78|[{10665, 7.97987}...|
|          108|[{8174, 7.989826}...|
|           34|[{4459, 14.024325...|
|          101|[{16911, 17.12249...|
|          115|[{370, 8.946846},...|
|          126|[{129, 11.913958}...|
|           81|[{177, 7.8113375}...|
|           28|[{1735, 6.818796}...|
|           76|[{13104, 10.54229...|
|           26|[{15424, 8.739826...|
|           27|[{4459, 4.4456778...|
|           44|[{102, 7.7473755}...|
|          103|[{1735, 6.799515}...|
+-------------+--------------------+
only showing top 20 rows



In [62]:
recs.take(1)

[Row(user_id_index=148, recommendations=[Row(track_index=1735, rating=10.128324508666992), Row(track_index=461, rating=8.672374725341797), Row(track_index=2242, rating=6.055615425109863), Row(track_index=8036, rating=5.641534805297852), Row(track_index=370, rating=5.204923152923584), Row(track_index=359, rating=4.93851375579834), Row(track_index=15712, rating=4.541712284088135), Row(track_index=10219, rating=4.541712284088135), Row(track_index=2455, rating=4.3719329833984375), Row(track_index=129, rating=4.188811779022217)])]