# **WELCOME TO THIS NOTEBOOK**

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Let's install pyspark

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 38.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=aa47333259b2dbe7f2bb3e0677bfdcb1a948f512c354c96d1f994ae34b1070bb
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


Importing the modules

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [5]:
spark = SparkSession.builder.appName("lastfm").getOrCreate()

# Loading the dataset

In [6]:
file_path = "/content/drive/MyDrive/dataset/dataset/listenings.csv"
df_listenings = spark.read.format('csv').option('header', True).option('inferSchema', True).load(file_path)
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Cleaning tables 

In [7]:
df_listenings = df_listenings.drop('date')
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [8]:
df_listenings = df_listenings.na.drop()
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [10]:
row_count = df_listenings.count()
col_count = len(df_listenings.columns)
print(row_count, col_count)

13758905 4



# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [12]:
df_listenings_agg = df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|               Leloo|    1|
| --Seph|         The Embrace|    1|
| --Seph|          Paris 2004|    7|
| --Seph|Chelsea Hotel - L...|    1|
| --Seph|               Julia|    1|
| --Seph|In the Nothing of...|    2|
| --Seph|          I Miss You|    1|
| --Seph| The Riders of Rohan|    1|
| --Seph|Sunset Soon Forgo...|    1|
| --Seph|   Barbados Carnival|    1|
| --Seph|      Fragile Meadow|    1|
| --Seph|          Stupid Kid|    1|
| --Seph|Every Direction I...|    2|
| --Seph|         If It Works|    1|
| --Seph|           So Lonely|    2|
| --Seph|    Kiss with a Fist|    1|
| --Seph|             Starman|    2|
| --Seph|         Left Behind|    2|
| --Seph|   Duel of the Fates|    1|
| --Seph|       Pressure Drop|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [13]:
agg_row_count = df_listenings_agg.count()
print(agg_row_count)

9930128


In [14]:
df_listenings_agg = df_listenings_agg.limit(50000)

# Let's convert the user id and track columns into unique integers




In [15]:
indexer = [StringIndexer(inputCol=col , outputCol=col +'_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns) - set(['count']))]

pipeline = Pipeline(stages=indexer)

data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

+-------+--------------------+-----+-----------+-------------+
|user_id|               track|count|track_index|user_id_index|
+-------+--------------------+-----+-----------+-------------+
| --Seph|               Leloo|    1|    21407.0|        171.0|
| --Seph|Virus (Luke Fair ...|    1|    35315.0|        171.0|
| --Seph|Airplanes [feat H...|    1|     2600.0|        171.0|
| --Seph|Belina (Original ...|    1|     9434.0|        171.0|
| --Seph|              Monday|    1|     1865.0|        171.0|
| --Seph|Hungarian Dance No 5|    1|     3934.0|        171.0|
| --Seph|       Life On Mars?|    1|      297.0|        171.0|
| --Seph|  California Waiting|    1|     1335.0|        171.0|
| --Seph|       Phantom Pt II|    1|     4972.0|        171.0|
| --Seph|   Summa for Strings|    1|    31009.0|        171.0|
| --Seph|      Hour for magic|    2|    18142.0|        171.0|
| --Seph|Hungarian Rhapsod...|    1|    18280.0|        171.0|
| --Seph|     The Way We Were|    1|    33434.0|       

In [16]:
data = data.select('user_id_index', 'track_index', 'count').orderBy('user_id_index')
data.show()

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|    28737.0|    1|
|          0.0|     9657.0|    1|
|          0.0|    14996.0|    1|
|          0.0|     3622.0|    2|
|          0.0|    22307.0|    3|
|          0.0|    35135.0|    1|
|          0.0|     3949.0|    1|
|          0.0|    25436.0|    1|
|          0.0|     3862.0|    2|
|          0.0|     7938.0|    1|
|          0.0|      701.0|    1|
|          0.0|    14343.0|    1|
|          0.0|    35504.0|    1|
|          0.0|    26660.0|    1|
|          0.0|      243.0|    1|
|          0.0|     6515.0|    2|
|          0.0|    26198.0|    1|
|          0.0|    36046.0|    1|
|          0.0|    29069.0|    1|
|          0.0|    11911.0|    1|
+-------------+-----------+-----+
only showing top 20 rows



# Train and Test data

In [19]:
(training, test) = data.randomSplit([0.7, 0.3])

# Let's Create our Model

In [20]:
USERID = 'user_id_index'
TRACK = 'track_index'
COUNT = 'count'

als = ALS(maxIter=5 , regParam=0.01, userCol=USERID, itemCol=TRACK, ratingCol=COUNT)
model=als.fit(training)

predictions = model.transform(test)



# Generate top 10 Track recommendations for each user

In [21]:
recs = model.recommendForAllUsers(10)
recs.show()



+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|            0|[{19458, 10.77220...|
|            1|[{14363, 15.37741...|
|            2|[{33819, 15.00418...|
|            3|[{37409, 14.46506...|
|            4|[{5940, 8.552277}...|
|            5|[{1603, 9.665731}...|
|            6|[{27395, 52.18092...|
|            7|[{33819, 19.06969...|
|            8|[{27266, 10.64205...|
|            9|[{37409, 12.78508...|
|           10|[{21959, 9.376755...|
|           11|[{19458, 13.03728...|
|           12|[{1237, 12.710482...|
|           13|[{27395, 19.99548...|
|           14|[{33819, 30.01852...|
|           15|[{37409, 15.54863...|
|           16|[{714, 10.645372}...|
|           17|[{37409, 10.5479}...|
|           18|[{19458, 24.0972}...|
|           19|[{609, 9.245275},...|
+-------------+--------------------+
only showing top 20 rows



In [22]:
recs.take(1)

[Row(user_id_index=0, recommendations=[Row(track_index=19458, rating=10.77220344543457), Row(track_index=21484, rating=9.042082786560059), Row(track_index=609, rating=7.665572166442871), Row(track_index=944, rating=6.589339733123779), Row(track_index=15142, rating=6.584371089935303), Row(track_index=26877, rating=6.584371089935303), Row(track_index=12374, rating=6.584371089935303), Row(track_index=27266, rating=6.555593013763428), Row(track_index=11937, rating=6.07129430770874), Row(track_index=157, rating=5.5752692222595215)])]