# **WELCOME TO THIS NOTEBOOK**

Let's install pyspark

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!pip install pyspark==3.0.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 34 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 38.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=54c63b264dd179aa2a3c74fc4975f9025523bffdcdc331cbf7a56d675c6a7730
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


Importing the modules

In [None]:
https://drive.google.com/drive/folders/1ZhV5LxuRBYiD81HJHQTheQawXZ5MkPim

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [6]:
spark = SparkSession.builder.appName("lastfm").getOrCreate()

# Loading the dataset

In [7]:
file_path = '/content/drive/MyDrive/dataset/listenings.csv'
df_listenings = spark.read.format('csv').option('header', True).option('inferSchema', True).load(file_path)
df_listenings.show(5)

+-----------+-------------+--------------------+---------+-----------------+
|    user_id|         date|               track|   artist|            album|
+-----------+-------------+--------------------+---------+-----------------+
|000Silenced|1299680100000|           Price Tag| Jessie J|      Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...| Jessie J|        Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|    Robyn|         Be Mine!|
|000Silenced|1299679200000|            Acapella|    Kelis|         Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|The Tease|I'm Not Invisible|
+-----------+-------------+--------------------+---------+-----------------+
only showing top 5 rows




# Cleaning tables 

In [8]:
df_listenings = df_listenings.drop('date')
df_listenings.show(2)

+-----------+--------------------+--------+-----------+
|    user_id|               track|  artist|      album|
+-----------+--------------------+--------+-----------+
|000Silenced|           Price Tag|Jessie J|Who You Are|
|000Silenced|Price Tag (Acoust...|Jessie J|  Price Tag|
+-----------+--------------------+--------+-----------+
only showing top 2 rows



In [9]:
df_listenings = df_listenings.na.drop()
df_listenings.show(3)

+-----------+--------------------+--------+-----------+
|    user_id|               track|  artist|      album|
+-----------+--------------------+--------+-----------+
|000Silenced|           Price Tag|Jessie J|Who You Are|
|000Silenced|Price Tag (Acoust...|Jessie J|  Price Tag|
|000Silenced|Be Mine! (Ballad ...|   Robyn|   Be Mine!|
+-----------+--------------------+--------+-----------+
only showing top 3 rows



In [11]:
row_numbers = df_listenings.count()
column_numbers = len(df_listenings.columns)
print(row_numbers, column_numbers)

13758905 4



# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [10]:
df_listenings_agg = df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show(3)

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|Airplanes [feat H...|    1|
| --Seph| White Winter Hymnal|    3|
| --Seph|Virus (Luke Fair ...|    1|
+-------+--------------------+-----+
only showing top 3 rows



In [None]:
row_numbers = df_listenings_agg.count()
column_numbers = len(df_listenings_agg.columns)
print(row_numbers, column_numbers)

9930128 3


In [12]:
df_listenings_agg = df_listenings_agg.limit(20000)

# Let's convert the user id and track columns into unique integers




In [13]:
indexer = [StringIndexer(inputCol = col, outputCol = col+ '_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns)-set(['count']))]
pipeline = Pipeline(stages = indexer)
data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show(10)

+-------+--------------------+-----+-----------+-------------+
|user_id|               track|count|track_index|user_id_index|
+-------+--------------------+-----+-----------+-------------+
| --Seph| White Winter Hymnal|    3|       59.0|         69.0|
| --Seph|Virus (Luke Fair ...|    1|    15896.0|         69.0|
| --Seph|Airplanes [feat H...|    1|      519.0|         69.0|
| --Seph|Belina (Original ...|    1|     3278.0|         69.0|
| --Seph|              Monday|    1|      334.0|         69.0|
| --Seph|Hungarian Dance No 5|    1|     7558.0|         69.0|
| --Seph|       Life On Mars?|    1|     1161.0|         69.0|
| --Seph|  California Waiting|    1|      197.0|         69.0|
| --Seph|       Phantom Pt II|    1|     1377.0|         69.0|
| --Seph|   Summa for Strings|    1|    13739.0|         69.0|
+-------+--------------------+-----+-----------+-------------+
only showing top 10 rows



In [14]:
newdata = data.select('user_id_index', 'track_index', 'count').orderBy('user_id_index')
newdata.show(5)

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|    10943.0|    1|
|          0.0|    14044.0|    1|
|          0.0|     1349.0|    1|
|          0.0|      381.0|    1|
|          0.0|     8692.0|    1|
+-------------+-----------+-----+
only showing top 5 rows



# Train and Test data

In [15]:
(training, test) = newdata.randomSplit([0.5,0.5])

# Let's Create our Model

In [16]:
USERID = 'user_id_index'
TRACK = 'track_index'
COUNT = 'count'

als = ALS(maxIter = 5, regParam = 0.01, userCol = USERID, itemCol = TRACK, ratingCol = COUNT)
model = als.fit(training)

predictions = model.transform(test)



# Generate top 10 Track recommendations for each user

In [17]:
recs = model.recommendForAllUsers(10)

In [18]:
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|           31|[[8672, 8.960984]...|
|           85|[[348, 7.0902405]...|
|          137|[[14301, 7.436167...|
|           65|[[4461, 12.145633...|
|           53|[[14301, 12.71358...|
|          133|[[7849, 17.996586...|
|           78|[[12399, 9.590354...|
|          108|[[16909, 13.98388...|
|           34|[[460, 10.257157]...|
|          101|[[16909, 17.60687...|
|          115|[[12696, 9.694574...|
|          126|[[16909, 14.30144...|
|           81|[[2484, 6.6794314...|
|           28|[[4461, 15.362949...|
|           76|[[2484, 7.1418614...|
|           26|[[12399, 8.961149...|
|           27|[[4461, 7.9478016...|
|           44|[[4461, 19.863472...|
|          103|[[17297, 5.983408...|
|           12|[[16909, 12.60335...|
+-------------+--------------------+
only showing top 20 rows



In [19]:
recs.take(1)

[Row(user_id_index=31, recommendations=[Row(track_index=8672, rating=8.960984230041504), Row(track_index=14301, rating=5.808537006378174), Row(track_index=5937, rating=5.531208038330078), Row(track_index=348, rating=4.992094993591309), Row(track_index=7849, rating=4.344074249267578), Row(track_index=1119, rating=4.03563117980957), Row(track_index=8053, rating=4.03563117980957), Row(track_index=10733, rating=3.9826598167419434), Row(track_index=2484, rating=3.9800233840942383), Row(track_index=9557, rating=3.8613994121551514)])]