3. Build a Recommendation Engine with Spark with a dataset of your
choice

Import Necessary Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.ml.evaluation import RegressionEvaluator

Create a Spark Session

In [3]:
spark = SparkSession.builder.appName("RecommendationEngine").config("spark.executor.memory", "4g").config("spark.driver.memory", "4g").getOrCreate()

25/04/22 23:57:32 WARN Utils: Your hostname, Boggavarapus-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.203 instead (on interface en0)
25/04/22 23:57:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/22 23:57:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


25/04/22 23:57:49 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Load the Dataset

In [4]:
ratings = spark.read.csv("ml-20m/ratings.csv", header=True, inferSchema=True)
movies = spark.read.csv("ml-20m/movies.csv", header=True, inferSchema=True)

# Show a preview of the ratings data
ratings.show(5)


                                                                                

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
+------+-------+------+----------+
only showing top 5 rows



Data Preprocessing

In [5]:
ratings = ratings.select("userId", "movieId", "rating")
ratings = ratings.withColumn("userId", col("userId").cast("int"))
ratings = ratings.withColumn("movieId", col("movieId").cast("int"))
ratings = ratings.withColumn("rating", col("rating").cast("float"))

Split the Data for Training and Testing

In [6]:
(training_data, test_data) = ratings.randomSplit([0.8, 0.2])

Train the Recommendation Model Using ALS

In [7]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank=10, maxIter=10, regParam=0.1, coldStartStrategy="drop")
model = als.fit(training_data)

25/04/22 23:58:57 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/04/22 23:58:57 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/04/22 23:58:58 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

Make Predictions

In [8]:
predictions = model.transform(test_data)
predictions.show(5)

                                                                                

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   137|   1591|   2.0|  2.706579|
|   251|   1580|   4.0|  4.089978|
|   251|   2142|   3.0| 3.4444299|
|   271|   1088|   0.5| 2.5872908|
|   271|   5803|   1.5| 2.5559204|
+------+-------+------+----------+
only showing top 5 rows



Evaluate the Model

In [9]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-Mean-Square Error = {rmse}")

[Stage 198:>                                                        (0 + 8) / 8]

Root-Mean-Square Error = 0.8091666998656216


                                                                                

Generate Recommendations for Users

In [10]:
user_recommendations = model.recommendForAllUsers(5)
user_recommendations.show(5)



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    12|[{127021, 5.39941...|
|    26|[{127021, 5.52069...|
|    27|[{74159, 5.785264...|
|    28|[{101862, 5.87432...|
|    31|[{77931, 6.340824...|
+------+--------------------+
only showing top 5 rows



                                                                                

Generate Recommendations for Movies

In [11]:
movie_recommendations = model.recommendForAllItems(5)
movie_recommendations.show(5)



+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     12|[{49868, 4.75084}...|
|     26|[{23589, 5.088555...|
|     27|[{37097, 5.089628...|
|     28|[{53192, 5.247551...|
|     31|[{23589, 4.983071...|
+-------+--------------------+
only showing top 5 rows



                                                                                

Stop Spark Session

In [None]:
spark.stop()
print("Spark session stopped successfully.")