# **WELCOME TO THIS NOTEBOOK**

Let's install pyspark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 70kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 34.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=34ef706d2276cca5aeee017df2aac469c9450c13f860280abd5f60b7fb2102b2
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


Importing the modules

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [None]:
spark = SparkSession.builder.appName('lastfm').getOrCreate( )

# Loading the dataset

In [None]:
file_path = '/content/drive/MyDrive/Colab Notebooks/Projects/dataset/listenings.csv'
df_listenings = spark.read.csv(file_path, header=True, inferSchema=True)
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Cleaning tables 

In [None]:
df_listenings = df_listenings.drop('date')
df_listenings

DataFrame[user_id: string, track: string, artist: string, album: string]

In [None]:
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [None]:
df_listenings = df_listenings.na.drop()
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For


# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [None]:
rows = df_listenings.count()
columns = len(df_listenings.columns)
print(rows, columns)

13758905 4


In [None]:
df_listenings_agg = df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|Chelsea Hotel - L...|    1|
| --Seph|        Window Blues|    1|
| --Seph|          Paris 2004|    7|
| --Seph|Hungarian Rhapsod...|    1|
| --Seph|Vestido Estampado...|    1|
| --Seph|   Summa for Strings|    1|
| --Seph|         The Embrace|    1|
| --Seph|       Phantom Pt II|    1|
| --Seph|  California Waiting|    1|
| --Seph|     The Way We Were|    1|
| --Seph| Air on the G String|    1|
| --Seph|              Monday|    1|
| --Seph|Belina (Original ...|    1|
| --Seph|       Life On Mars?|    1|
| --Seph|               Leloo|    1|
| --Seph|Hungarian Dance No 5|    1|
| --Seph| White Winter Hymnal|    3|
| --Seph|Airplanes [feat H...|    1|
| --Seph|      Hour for magic|    2|
| --Seph|Virus (Luke Fair ...|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [None]:
row_number = df_listenings_agg.count()
col_number = len(df_listenings_agg.columns)
print(row_number, col_number)

9930128 3


In [None]:
df_listenings_agg = df_listenings_agg.limit(20000)

# Let's convert the user id and track columns into unique integers




In [None]:
indexer = [StringIndexer(inputCol=col, outputCol=col+'_index').fit(df_listenings_agg) 
    for col in list(set(df_listenings_agg.columns) - set(['count']))
]

pipeline = Pipeline(stages=indexer)
data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

+-------+--------------------+-----+-------------+-----------+
|user_id|               track|count|user_id_index|track_index|
+-------+--------------------+-----+-------------+-----------+
| --Seph| White Winter Hymnal|    3|         69.0|       59.0|
| --Seph|Virus (Luke Fair ...|    1|         69.0|    15896.0|
| --Seph|Airplanes [feat H...|    1|         69.0|      519.0|
| --Seph|Belina (Original ...|    1|         69.0|     3278.0|
| --Seph|              Monday|    1|         69.0|      334.0|
| --Seph|Hungarian Dance No 5|    1|         69.0|     7558.0|
| --Seph|       Life On Mars?|    1|         69.0|     1161.0|
| --Seph|  California Waiting|    1|         69.0|      197.0|
| --Seph|       Phantom Pt II|    1|         69.0|     1377.0|
| --Seph|   Summa for Strings|    1|         69.0|    13739.0|
| --Seph|      Hour for magic|    2|         69.0|     7495.0|
| --Seph|Hungarian Rhapsod...|    1|         69.0|     7559.0|
| --Seph|     The Way We Were|    1|         69.0|    1

In [None]:
data = data.select('user_id_index', 'track_index', 'count').orderBy('user_id_index')

In [None]:
data.show()

+-------+--------------------+-----+-------------+-----------+
|user_id|               track|count|user_id_index|track_index|
+-------+--------------------+-----+-------------+-----------+
| --Seph| White Winter Hymnal|    3|         69.0|       59.0|
| --Seph|Virus (Luke Fair ...|    1|         69.0|    15896.0|
| --Seph|Airplanes [feat H...|    1|         69.0|      519.0|
| --Seph|Belina (Original ...|    1|         69.0|     3278.0|
| --Seph|              Monday|    1|         69.0|      334.0|
| --Seph|Hungarian Dance No 5|    1|         69.0|     7558.0|
| --Seph|       Life On Mars?|    1|         69.0|     1161.0|
| --Seph|  California Waiting|    1|         69.0|      197.0|
| --Seph|       Phantom Pt II|    1|         69.0|     1377.0|
| --Seph|   Summa for Strings|    1|         69.0|    13739.0|
| --Seph|      Hour for magic|    2|         69.0|     7495.0|
| --Seph|Hungarian Rhapsod...|    1|         69.0|     7559.0|
| --Seph|     The Way We Were|    1|         69.0|    1

# Train and Test data

In [None]:
(training, test) = data.randomSplit([0.5, 0.5])

# Let's Create our Model

In [None]:
USERID = 'user_id_index'
TRACK = 'track_index'
COUNT = 'count'

als = ALS(maxIter=5, regParam=0.01, userCol=USERID, itemCol=TRACK, ratingCol=COUNT)
model = als.fit(training)

predictions = model.transform(test)


# Generate top 10 Track recommendations for each user

In [None]:
recs = model.recommendForAllUsers(10)

In [None]:
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|           31|[[11941, 41.22383...|
|           85|[[4461, 24.862276...|
|          137|[[11941, 22.65895...|
|           65|[[11941, 24.01278...|
|           53|[[11941, 7.842286...|
|          133|[[14826, 17.99655...|
|           78|[[11941, 25.85875...|
|          108|[[4461, 27.43491]...|
|           34|[[16909, 17.96572...|
|          101|[[16909, 15.64545...|
|          115|[[11941, 12.16764...|
|          126|[[568, 13.998391]...|
|           81|[[2484, 11.133461...|
|           28|[[4461, 14.521337...|
|           76|[[11941, 34.28729...|
|           26|[[11941, 48.85452...|
|           27|[[4461, 13.800264...|
|           44|[[16969, 6.670264...|
|          103|[[4461, 19.330572...|
|           12|[[16909, 14.76492...|
+-------------+--------------------+
only showing top 20 rows



In [None]:
recs.take(1)

[Row(user_id_index=31, recommendations=[Row(track_index=11941, rating=41.22383117675781), Row(track_index=460, rating=10.276081085205078), Row(track_index=1325, rating=8.22818660736084), Row(track_index=12192, rating=7.963444232940674), Row(track_index=1693, rating=7.450974941253662), Row(track_index=348, rating=7.448973655700684), Row(track_index=8037, rating=7.127445220947266), Row(track_index=13107, rating=7.0126824378967285), Row(track_index=5293, rating=6.761024475097656), Row(track_index=2245, rating=5.070768356323242)])]