# **WELCOME TO THIS NOTEBOOK**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Let's install pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 31 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 47.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=518a691181f8bb812b2ca4d6645d7caf2bf7b99279adb1304f8c261a52974bf6
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


Importing the modules

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [None]:
spark = SparkSession.builder.appName("lastfm").getOrCreate()

# Loading the dataset

The dataset is 4GB size. In each row iwe can see information about a specific user and a specific track they have listend to, in addition to the artist and the album of the track, and the date of listening.

In [None]:
file_path="/content/drive/MyDrive/dataset/listenings.csv"
df_listening = spark.read.format('csv').option('header',True).option('inferSchema',True).load(file_path) # inferSchema : this option will infer column types based on the dataset 
df_listening.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Cleaning tables 

In [None]:
# dropping date colomn
df_listening= df_listening.drop('date')
df_listening.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [None]:
# Deleting Nan values
df_listening=df_listening.na.drop()
df_listening.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [None]:
# Checking the shape of the dataframe : (13758905 *4)
row_count=df_listening.count()
column_count=len(df_listening.columns) #.columns: Returns all column names as a list.
print(row_count, column_count)

13758905 4



# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [None]:
df_listening_agg = df_listening.select('user_id','track').groupby('user_id','track').agg(count('*').alias('count')).orderBy('user_id')
df_listening_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|               Leloo|    1|
| --Seph|         The Embrace|    1|
| --Seph|          Paris 2004|    7|
| --Seph|Chelsea Hotel - L...|    1|
| --Seph|               Julia|    1|
| --Seph|In the Nothing of...|    2|
| --Seph|          I Miss You|    1|
| --Seph| The Riders of Rohan|    1|
| --Seph|Sunset Soon Forgo...|    1|
| --Seph|   Barbados Carnival|    1|
| --Seph|      Fragile Meadow|    1|
| --Seph|          Stupid Kid|    1|
| --Seph|Every Direction I...|    2|
| --Seph|         If It Works|    1|
| --Seph|           So Lonely|    2|
| --Seph|    Kiss with a Fist|    1|
| --Seph|             Starman|    2|
| --Seph|         Left Behind|    2|
| --Seph|   Duel of the Fates|    1|
| --Seph|       Pressure Drop|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [None]:
# Checking the shape of the dataframe : ( 9930128*3)
row_count_agg=df_listening_agg.count()
column_count_agg=len(df_listening_agg.columns) #.columns: Returns all column names as a list.
print(row_count_agg, column_count_agg)

9930128 3


In [None]:
 # take a sample of the dataframe
 df_listening_agg = df_listening_agg.limit(20000)

# Let's convert the user id and track columns into unique integers




In [None]:
# stringindexer is a transformer that maps string values to unique numeric indices
# we perform it on user and track columns
# 1.Defining the indexer (transformer)
indexer = [StringIndexer(inputCol=col, outputCol=col+'_index').setHandleInvalid("keep").fit(df_listening_agg) for col in list(set(df_listening_agg.columns)-set(['count']))]

In [None]:
# create the pipeline with the indexer to pass the dataframe and transform the 2 columns
pipeline=Pipeline(stages=indexer)
data=pipeline.fit(df_listening_agg).transform(df_listening_agg)
data.show()

+-------+--------------------+-----+-------------+-----------+
|user_id|               track|count|user_id_index|track_index|
+-------+--------------------+-----+-------------+-----------+
| --Seph|          Nightmares|    1|         69.0|    10600.0|
| --Seph|Virus (Luke Fair ...|    1|         69.0|    15893.0|
| --Seph|Airplanes [feat H...|    1|         69.0|      521.0|
| --Seph|Belina (Original ...|    1|         69.0|     3280.0|
| --Seph|              Monday|    1|         69.0|      334.0|
| --Seph|Hungarian Dance No 5|    1|         69.0|     7555.0|
| --Seph|       Life On Mars?|    1|         69.0|     1164.0|
| --Seph|  California Waiting|    1|         69.0|      195.0|
| --Seph|       Phantom Pt II|    1|         69.0|     1378.0|
| --Seph|   Summa for Strings|    1|         69.0|    13737.0|
| --Seph|      Hour for magic|    2|         69.0|     7492.0|
| --Seph|Hungarian Rhapsod...|    1|         69.0|     7556.0|
| --Seph|     The Way We Were|    1|         69.0|    1

In [None]:
# selecting the columns with the indices and the count column
data=data.select('user_id_index','track_index','count').orderBy('user_id_index')

In [None]:
data.show()

+-------+--------------------+-----+-------------+-----------+
|user_id|               track|count|user_id_index|track_index|
+-------+--------------------+-----+-------------+-----------+
| --Seph|          Nightmares|    1|         69.0|    10600.0|
| --Seph|Virus (Luke Fair ...|    1|         69.0|    15893.0|
| --Seph|Airplanes [feat H...|    1|         69.0|      521.0|
| --Seph|Belina (Original ...|    1|         69.0|     3280.0|
| --Seph|              Monday|    1|         69.0|      334.0|
| --Seph|Hungarian Dance No 5|    1|         69.0|     7555.0|
| --Seph|       Life On Mars?|    1|         69.0|     1164.0|
| --Seph|  California Waiting|    1|         69.0|      195.0|
| --Seph|       Phantom Pt II|    1|         69.0|     1378.0|
| --Seph|   Summa for Strings|    1|         69.0|    13737.0|
| --Seph|      Hour for magic|    2|         69.0|     7492.0|
| --Seph|Hungarian Rhapsod...|    1|         69.0|     7556.0|
| --Seph|     The Way We Were|    1|         69.0|    1

# Train and Test data

In [None]:
# splitting the training and testing dataset
(training, test) = data.randomSplit([0.5,0.5])


# Let's Create our Model

In [None]:
USERID='user_id_index'
TRACK='track_index'
COUNT='count'
# als is an iterative algorithm, we are supposed to select the number of iteration
als=ALS(maxIter=5, regParam=0.01,userCol=USERID,itemCol=TRACK,ratingCol=COUNT)
# this recommendation system based on the number of times each user has listened to a song 
model=als.fit(training) # creating the model
# make predections
predictions=model.transform(test)


# Generate top 10 Track recommendations for each user

In [None]:
# find 10 recomendations for each user
recs = model.recommendForAllUsers(10)



In [None]:
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|            0|[{4460, 15.162046...|
|            1|[{180, 9.605092},...|
|            2|[{16908, 12.95268...|
|            3|[{461, 7.79856}, ...|
|            4|[{7847, 9.112072}...|
|            5|[{13401, 5.726028...|
|            6|[{1739, 11.786427...|
|            7|[{7847, 6.070025}...|
|            8|[{16908, 12.66082...|
|            9|[{1739, 13.227573...|
|           10|[{233, 7.2070556}...|
|           11|[{4460, 8.276293}...|
|           12|[{4460, 10.907128...|
|           13|[{461, 7.066088},...|
|           14|[{1694, 7.20497},...|
|           15|[{2485, 10.753057...|
|           16|[{2380, 5.5988297...|
|           17|[{1694, 9.510003}...|
|           18|[{2485, 9.491067}...|
|           19|[{16908, 21.60408...|
+-------------+--------------------+
only showing top 20 rows



In [None]:
# show recommends for one user
recs.take(1)

[Row(user_id_index=0, recommendations=[Row(track_index=4460, rating=15.162046432495117), Row(track_index=16908, rating=12.427143096923828), Row(track_index=304, rating=10.761274337768555), Row(track_index=308, rating=9.741501808166504), Row(track_index=9498, rating=9.52747631072998), Row(track_index=84, rating=9.447318077087402), Row(track_index=120, rating=9.330489158630371), Row(track_index=461, rating=8.363097190856934), Row(track_index=1439, rating=8.035255432128906), Row(track_index=1122, rating=7.639596462249756)])]

In [None]:
# Finished )