# Music Recommendation System Using PySpark

## Project Goal

Create a Music recommendation system based on Users choice using the ALS (alternating least squares) algorithm in Pyspark.

In [1]:
#from google.colab import drive
#drive.mount('/content/drive')

In [2]:
#!pip install pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [4]:
# Create a spark session

spark = SparkSession.builder.appName("lastfm").getOrCreate()

## Loading the dataset

In [5]:
# download the data set

file = './listenings.csv'

df_listen = spark.read.format('csv').option('header',True).option('inferSchema',True).load(file)


## Cleaning tables

In [6]:
df_listen.show(5)

+-----------+-------------+--------------------+---------+-----------------+
|    user_id|         date|               track|   artist|            album|
+-----------+-------------+--------------------+---------+-----------------+
|000Silenced|1299680100000|           Price Tag| Jessie J|      Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...| Jessie J|        Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|    Robyn|         Be Mine!|
|000Silenced|1299679200000|            Acapella|    Kelis|         Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|The Tease|I'm Not Invisible|
+-----------+-------------+--------------------+---------+-----------------+
only showing top 5 rows



In [7]:
df_listen = df_listen.drop('date')
df_listen = df_listen.na.drop() # remove nan values
df_listen.show(5)

+-----------+--------------------+---------+-----------------+
|    user_id|               track|   artist|            album|
+-----------+--------------------+---------+-----------------+
|000Silenced|           Price Tag| Jessie J|      Who You Are|
|000Silenced|Price Tag (Acoust...| Jessie J|        Price Tag|
|000Silenced|Be Mine! (Ballad ...|    Robyn|         Be Mine!|
|000Silenced|            Acapella|    Kelis|         Acapella|
|000Silenced|   I'm Not Invisible|The Tease|I'm Not Invisible|
+-----------+--------------------+---------+-----------------+
only showing top 5 rows



In [8]:
df_listen.columns

['user_id', 'track', 'artist', 'album']

In [9]:
df_listen.count()

13758905


# Aggregations
to see how many times each user has listened to specific track


In [10]:
from pyspark.sql.functions import count

# Aggregate and count occurrences of each combination of 'user_id' and 'track'
df_listen_agg = df_listen.select('user_id', 'track') \
    .groupby('user_id', 'track') \
    .agg(count('*').alias('count')) \
    .orderBy('user_id')

df_listen_agg = df_listen_agg.na.drop()

# Show the aggregated DataFrame
df_listen_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|             Andante|    1|
| --Seph|Airplanes [feat H...|    1|
| --Seph|           2 and 2=5|    1|
| --Seph|Belina (Original ...|    1|
| --Seph|                0040|    1|
| --Seph|  California Waiting|    1|
| --Seph|        5 Years Time|    1|
| --Seph|      Hour for magic|    2|
| --Seph|             401 Lwa|    2|
| --Seph|Hungarian Dance No 5|    1|
| --Seph|         Agoraphobia|    1|
| --Seph|Hungarian Rhapsod...|    1|
| --Seph|        After Nature|    1|
| --Seph|       Life On Mars?|    1|
| --Seph|       Amazing Grace|    1|
| --Seph|              Monday|    1|
| --Seph| Air on the G String|    1|
| --Seph|       Phantom Pt II|    1|
| --Seph|   Duel of the Fates|    1|
| --Seph|   Summa for Strings|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [11]:
df_listen_agg.count()

9930128

## Convert the user id and track columns into unique integers




In [12]:
# limit the data set since it has around a million rows to 1000 rows

df_listen_agg = df_listen_agg.limit(10000)

In [13]:
indexer = [StringIndexer(inputCol=col, outputCol=col+'_index').fit(df_listen_agg) for col in ['user_id','track']]
pipeline = Pipeline(stages=indexer)

data=  pipeline.fit(df_listen_agg).transform(df_listen_agg)

data = data.na.drop()
data.show(5)

+-------+--------------------+-----+-------------+-----------+
|user_id|               track|count|user_id_index|track_index|
+-------+--------------------+-----+-------------+-----------+
| --Seph| Air on the G String|    1|         34.0|      981.0|
| --Seph|Belina (Original ...|    1|         34.0|     1448.0|
| --Seph|  California Waiting|    1|         34.0|      176.0|
| --Seph|      Hour for magic|    2|         34.0|     3807.0|
| --Seph|Hungarian Dance No 5|    1|         34.0|     3844.0|
+-------+--------------------+-----+-------------+-----------+
only showing top 5 rows



In [14]:
data.columns

['user_id', 'track', 'count', 'user_id_index', 'track_index']

In [15]:
df = data.select('user_id_index','track_index','count').orderBy('user_id')
df = df.na.drop()
df.show(5)

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|         34.0|      981.0|    1|
|         34.0|     1448.0|    1|
|         34.0|      176.0|    1|
|         34.0|     3807.0|    2|
|         34.0|     3844.0|    1|
+-------------+-----------+-----+
only showing top 5 rows



# Train and Test data

In [24]:
# Perform a 70-30 split
(train, test) = df.randomSplit([0.7, 0.3], seed = 2)


In [25]:
df.columns

['user_id_index', 'track_index', 'count']

In [26]:
train.show(10)

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|       11.0|    1|
|          0.0|       18.0|    1|
|          0.0|       39.0|    1|
|          0.0|       41.0|    1|
|          0.0|      175.0|    2|
|          0.0|      244.0|    1|
|          0.0|      390.0|    1|
|          0.0|      462.0|    1|
|          0.0|      509.0|    1|
|          0.0|      553.0|    1|
+-------------+-----------+-----+
only showing top 10 rows



In [27]:
test.show(10)

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|      155.0|    1|
|          0.0|      431.0|    1|
|          0.0|      732.0|    1|
|          0.0|      782.0|    1|
|          0.0|      977.0|    1|
|          0.0|     1124.0|    1|
|          0.0|     1190.0|    1|
|          0.0|     1203.0|    1|
|          0.0|     1227.0|    1|
|          0.0|     1291.0|    1|
+-------------+-----------+-----+
only showing top 10 rows



## Create ALS Model

In [20]:
als = ALS(maxIter=5, regParam=0.01, userCol='user_id_index', itemCol='track_index', ratingCol='count')


In [21]:
model = als.fit(train)

pred = model.transform(test)



# Generate top 10 Track recommendations for each user

In [22]:
recs = model.recommendForAllUsers(10)

recs.show(10)

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|           20|[{1286, 6.159645}...|
|           40|[{310, 6.5270967}...|
|           10|[{7197, 5.062258}...|
|           50|[{992, 6.1635942}...|
|           70|[{8233, 23.012083...|
|           60|[{6824, 10.988994...|
|           30|[{7197, 6.483232}...|
|            0|[{4889, 8.662129}...|
|           31|[{76, 5.7290225},...|
|            1|[{5920, 4.9742584...|
+-------------+--------------------+
only showing top 10 rows



In [30]:
rec_user_1 = recs.take(1)

In [31]:
all_track_ids = []

# Iterate over each row in the list of recommendations data
for row in rec_user_1:
    recommendations = row.recommendations
    
    # Iterate over each recommendation for the current user
    for recommendation in recommendations:
        track_index = recommendation.track_index
        
        # Append the track index to the list of track IDs
        all_track_ids.append(track_index)

# Print the list of all track IDs
print("All Track IDs:", all_track_ids)

All Track IDs: [1286, 3532, 9074, 5563, 4278, 6824, 7473, 7197, 6949, 3530]


In [32]:
track_index_to_name = {}

# Iterate over the unique track indices in the DataFrame 'data'
for row in data.select('track_index', 'track').distinct().collect():
    track_index_to_name[row['track_index']] = row['track']

# Define a function to get track name for a given track index
def get_track_name(track_index):
    return track_index_to_name.get(track_index, None)

In [37]:
trk_idx = all_track_ids

tn_list = []
for idx in trk_idx:
    tn = get_track_name(idx)
    tn_list.append(tn)
    

print('The 10 highly recommended songs for the user are :')
for i,n in enumerate(tn_list):
    print(i+1,n)

The 10 highly recommended songs for the user are :
1 Away From Here
2 Had Enough
3 You're Not Alone
4 No Rain
5 It's Not OK
6 Shrouded in Blinding Light
7 Technodanceaphobic
8 Steer the Canyon
9 Small Things
10 Habits (Stay High)
