# **WELCOME TO THIS NOTEBOOK**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Let's install pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=caa6d535c2d81cfa4e0592cbbf89d3d12db49d29a73a93ff419a5c24fe365ab5
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


Importing the modules

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [None]:
spark = SparkSession.builder.appName("lastfn").getOrCreate()

# Loading the dataset

In [9]:
file_path = '/content/drive/MyDrive/listenings.csv'
df_listenings = spark.read.format('csv').option('header', True).option('inferSchema', True).load(file_path)
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Cleaning tables

In [11]:
df_listenings = df_listenings.drop('date')
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [12]:
df_listenings = df_listenings.na.drop()
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [13]:
row_numbers = df_listenings.count()
column_numbers = len(df_listenings.columns)
print(row_numbers, column_numbers)

13758905 4



# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [14]:
df_listenings_agg = df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*')).orderBy('user_id')
df_listenings_agg.show()

+-------+--------------------+--------+
|user_id|               track|count(1)|
+-------+--------------------+--------+
| --Seph| White Winter Hymnal|       3|
| --Seph|         The Funeral|       1|
| --Seph|Hope There's Someone|       1|
| --Seph|         The Painter|       1|
| --Seph|          Nightmares|       1|
| --Seph|            War Pigs|       1|
| --Seph|                 F12|       1|
| --Seph|                Team|       1|
| --Seph|          Je te veux|       1|
| --Seph|               Radio|       1|
| --Seph|   All I Want Is You|       1|
| --Seph|    Little by Little|       2|
| --Seph|          Ode to Joy|       1|
| --Seph|In the Hall of th...|       1|
| --Seph|   Hey There Delilah|       1|
| --Seph|   Let's Call It Off|       1|
| --Seph|               Leloo|       1|
| --Seph|             Pack Up|       1|
| --Seph|           Introitus|       1|
| --Seph|        The Leanover|       1|
+-------+--------------------+--------+
only showing top 20 rows



In [15]:
df_listenings_agg = df_listenings_agg.limit(20000)

# Let's convert the user id and track columns into unique integers




In [18]:
indexer = [StringIndexer(inputCol=col, outputCol=col+'_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns) - set(['count']))]

pipeline = Pipeline(stages=indexer)

data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

+-------+--------------------+--------+--------------+-------------+-----------+
|user_id|               track|count(1)|count(1)_index|user_id_index|track_index|
+-------+--------------------+--------+--------------+-------------+-----------+
| --Seph|          Nightmares|       1|           0.0|         69.0|    10600.0|
| --Seph|Virus (Luke Fair ...|       1|           0.0|         69.0|    15893.0|
| --Seph|Airplanes [feat H...|       1|           0.0|         69.0|      521.0|
| --Seph|Belina (Original ...|       1|           0.0|         69.0|     3280.0|
| --Seph|              Monday|       1|           0.0|         69.0|      334.0|
| --Seph|Hungarian Dance No 5|       1|           0.0|         69.0|     7555.0|
| --Seph|       Life On Mars?|       1|           0.0|         69.0|     1164.0|
| --Seph|  California Waiting|       1|           0.0|         69.0|      195.0|
| --Seph|       Phantom Pt II|       1|           0.0|         69.0|     1378.0|
| --Seph|   Summa for String

In [46]:
data = data.select('user_id_index', 'track_index', 'count(1)').orderBy('user_id_index')

In [43]:
data.show()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


# Train and Test data

In [40]:
(training, test) = data.randomSplit([0.5,0.5])

# Let's Create our Model

In [48]:
USERID = 'user_id_index'
TRACK = 'track_index'
COUNT = 'count(1)'

als = ALS(maxIter=5, regParam=0.01, userCol=USERID, itemCol=TRACK, ratingCol=COUNT)

model = als.fit(training)

predictions = model.transform(test)

Py4JJavaError: ignored


# Generate top 10 Track recommendations for each user

In [None]:
recs = model.recommendForAllUsers(10)

In [None]:
recs.show()

In [None]:
recs.take(1)