<a href="https://colab.research.google.com/github/Ayman947/Songs-Recommendation-Engine-with-PySpark/blob/main/%F0%9F%8E%B8%E2%99%AB%E2%99%AB_Songs_Recommendation_Engine_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Importing Packages**

In [1]:
! pip install pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.mllib.recommendation import Rating
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m25.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=e68d5bafc77efa0d3e98dbc7ef6545b2a951edc3cc541c722ee935a10564bc23
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

## **Creating a Spark's Session**

In [2]:
spark = SparkSession.builder.master('local[*]').appName('songs_recommender').getOrCreate()
print("Spark's version: ", spark.version)
app_name = spark.conf.get('spark.app.name')
driver_tcp_port = spark.conf.get('spark.driver.port')
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
# spark.conf.set("spark.sql.shuffle.partitions", 50)
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Spark's version:  3.3.2
Name: songs_recommender
Driver TCP port: 44857
Number of partitions: 200


## **Collecting the Data**

In [5]:
df = spark.read.csv("songsDataset.csv", header=True, inferSchema=True, sep=',').distinct()
# df.write.parquet('output.parquet', mode='overwrite')

## **Inspecting the data**

In [6]:
# printing the data schema
df.printSchema()
print()

# printing 5 rows of the data
df.show(5)
print()

# printng the data dimensions
print('Rows: ' + str(df.count()) + '   Columns: ' + str(len(df.columns)))
print()

root
 |-- 'userID': integer (nullable = true)
 |-- 'songID': integer (nullable = true)
 |-- 'rating': integer (nullable = true)


+--------+--------+--------+
|'userID'|'songID'|'rating'|
+--------+--------+--------+
|      25|  105962|       1|
|      56|   77182|       5|
|      78|   48344|       4|
|      80|   81638|       5|
|     154|  116008|       1|
+--------+--------+--------+
only showing top 5 rows


Rows: 2000000   Columns: 3



## **EDA**

In [7]:
numerator = df.select("'rating'").count()
num_users = df.select("'userID'").distinct().count()
num_items = df.select("'songID'").distinct().count()
denominator = num_users * num_items
sparsity = (1.0 - (numerator * 1.0)/denominator)*100
print("Number of users: ", num_users)
print("Number of items: ", num_items)
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

Number of users:  200000
Number of items:  127771
The ratings dataframe is  99.99% empty.


## **Data Preparation & Preprocessing**

In [8]:
# encoding user ids
users = df.select("'userID'").distinct()
users = users.coalesce(1)
users = users.withColumn('user_id_int', monotonically_increasing_id()).persist()
users.show(5)


# encoding item ids
items = df.select("'songID'").distinct()
items = items.coalesce(1)
items = items.withColumn('item_id_int', monotonically_increasing_id()).persist()
items.show(5)


# joining
df = df.join(users, "'userID'", "left").join(items, "'songID'", "left")
df.show(5)


# ratings data
ratings_data = df.select(F.col('user_id_int').alias("userId"), F.col('item_id_int').alias("itemId"), F.col("'rating'").alias('rating'))
ratings_data.show(5)


# train-test split
(training_data, test_data) = ratings_data.randomSplit([0.8, 0.2], seed=96)

+--------+-----------+
|'userID'|user_id_int|
+--------+-----------+
|     148|          0|
|     463|          1|
|     471|          2|
|     496|          3|
|     833|          4|
+--------+-----------+
only showing top 5 rows

+--------+-----------+
|'songID'|item_id_int|
+--------+-----------+
|  102798|          0|
|   56680|          1|
|   18051|          2|
|   47711|          3|
|   19984|          4|
+--------+-----------+
only showing top 5 rows

+--------+--------+--------+-----------+-----------+
|'songID'|'userID'|'rating'|user_id_int|item_id_int|
+--------+--------+--------+-----------+-----------+
|  105962|      25|       1|     132127|      45335|
|   77182|      56|       5|     156269|      19734|
|   48344|      78|       4|       7005|      56859|
|   81638|      80|       5|     131125|     112589|
|  116008|     154|       1|      75054|      28057|
+--------+--------+--------+-----------+-----------+
only showing top 5 rows

+------+------+------+
|userId|ite

## **Modeling: ALS**

In [9]:
als_model = ALS(userCol='userId',          # column that contains user ids
                itemCol='itemId',          # column that contains item ids
                ratingCol='rating',        # column that contains ratings
                rank=10,                   # no. of latent features
                maxIter=10,                # no. of ALS iterations
                regParam=0.5,              # regularization coefficient (λ)
                nonnegative=True,          # for ensuring positive numbers only
                coldStartStrategy="drop",  # for handling train/test splitting issues
                implicitPrefs=False)

als_model = als_model.fit(training_data)
predictions = als_model.transform(test_data)

## **Evaluation: RMSE**

In [10]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
print(f'Evaluation Metric: {evaluator.getMetricName()}')
print(f'Label Column : {evaluator.getLabelCol()}')
print(f'predictions Column: {evaluator.getPredictionCol()}')
print('*' * 30)
print (f'RMSE = {round(evaluator.evaluate(predictions), 3)}')

Evaluation Metric: rmse
Label Column : rating
predictions Column: prediction
******************************
RMSE = 1.474


## **Optimization**

In [None]:
# params = ParamGridBuilder()
# params = params.addGrid(als_model.rank, [5, 40, 80, 120])         # no. of latent features
# params = params.addGrid(als_model.maxIter, [5, 100, 250, 500])    # no. of ALS iterations
# params = params.addGrid(als_model.regParam, [0.05, 0.1, 1.5])     # regularization coefficient (λ)
# params = params.build()
# print(f'Number of models to be tested: {len(params)}' )


# cv = CrossValidator(estimator=als_model,
#                     estimatorParamMaps=params,
#                     evaluator=evaluator,
#                     numFolds=5, seed=96).fit(training_data)



# best_model = cv.bestModel
# best_predictions = best_model.transform(test_data)
# rmse = evaluator.evaluate(best_predictions)
# print("**Best Model**")
# print("  RMSE:", round(rmse, 3))
# print("  Rank:", best_model.getRank())
# print("  MaxIter:", best_model.getMaxIter())
# print("  RegParam:", best_model.getRegParam())

## **Recommendations**

In [11]:
# ALS_recommendations = best_model.recommendForAllUsers(5)
ALS_recommendations = als_model.recommendForAllUsers(5)
ALS_recommendations.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{4404, 5.700958}...|
|     3|[{4404, 5.5138726...|
|     5|[{26653, 3.228396...|
|     6|[{3118, 4.5049295...|
|     9|[{127675, 4.38110...|
+------+--------------------+
only showing top 5 rows



In [12]:
## stopping the Spark's session
spark.stop()