# Q1

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()
ratings = spark.read.json("movies.json").select("user_id", "product_id", "score")
ratings.cache()

ratings.printSchema()
ratings.show()



root
 |-- user_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- score: double (nullable = true)

+--------------+----------+-----+
|       user_id|product_id|score|
+--------------+----------+-----+
|A141HP4LYPWMSR|B003AI2VGA|  3.0|
|A328S9RN3U5M68|B003AI2VGA|  3.0|
|A1I7QGUDP043DG|B003AI2VGA|  5.0|
|A1M5405JH9THP9|B003AI2VGA|  3.0|
| ATXL536YX71TR|B003AI2VGA|  3.0|
|A3QYDL5CDNYN66|B003AI2VGA|  2.0|
| AQJVNDW6YZFQS|B003AI2VGA|  1.0|
| AD4CDZK7D31XP|B00006HAXW|  5.0|
|A3Q4S5DFVPB70D|B00006HAXW|  5.0|
|A2P7UB02HAVEPB|B00006HAXW|  5.0|
|A2TX99AZKDK0V7|B00006HAXW|  4.0|
| AFC8IKR407HSK|B00006HAXW|  5.0|
|A1FRPGQYQTAOR1|B00006HAXW|  5.0|
|A1RSDE90N6RSZF|B00006HAXW|  5.0|
|A1OUBOGB5970AO|B00006HAXW|  4.0|
|A3NPHQVIY59Y0Y|B00006HAXW|  5.0|
| AFKMBAY28XO8A|B00006HAXW|  5.0|
| A66KMXH9V7OGU|B00006HAXW|  5.0|
| AFJ27ZV9183B8|B00006HAXW|  5.0|
| AXMKAXC0TR9AW|B00006HAXW|  5.0|
+--------------+----------+-----+
only showing top 20 rows



# Q2

In [2]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_index").fit(ratings)
    for column in ["user_id", "product_id"]
]

pipeline = Pipeline(stages=indexers)
ratings_indexed = pipeline.fit(ratings).transform(ratings)

training_data,validation_data = ratings_indexed.randomSplit([8.0,2.0])

als = ALS(userCol="user_id_index",itemCol="product_id_index",ratingCol="score",rank=10,maxIter=5,regParam=0.01,coldStartStrategy="drop")
evaluator = RegressionEvaluator(metricName="rmse",labelCol="score",predictionCol="prediction")

model = als.fit(training_data)
predictions=model.transform(validation_data)
predictions.show(10,False)

+--------------+----------+-----+-------------+----------------+-----------+
|user_id       |product_id|score|user_id_index|product_id_index|prediction |
+--------------+----------+-----+-------------+----------------+-----------+
|A1A535W556ROBQ|B000063W82|5.0  |451.0        |6.0             |4.057655   |
|AI2G6ZZS0G00Y |B000063W1R|5.0  |5433.0       |37.0            |1.5073457  |
|A1TW9ZGRDQQZ2Y|B0001G6PZC|5.0  |133.0        |7.0             |4.8023205  |
|A1WMVV2AHMONSO|0790747324|5.0  |2874.0       |63.0            |1.3193603  |
|A1BJ4X0Y4SBW40|6303257933|4.0  |1201.0       |83.0            |-1.5725759 |
|A1VZLLDNLURGJB|0790747324|5.0  |183.0        |63.0            |1.006139   |
|A6PSFGFHI80VV |B0095D5454|3.0  |1030.0       |79.0            |-0.4818865 |
|AHIK7BUXFRMT8 |0800103688|4.0  |300.0        |163.0           |1.6777688  |
|A2R70OGNRCIPJM|B002OHDRF2|1.0  |3795.0       |21.0            |1.0026231  |
|A34TI9P66QGT0N|B000UGBOT0|5.0  |4197.0       |78.0            |-0.11888169|

# Q3

In [3]:
user1 = validation_data.filter(validation_data['user_id_index']==1.0).select(['product_id','user_id','user_id_index','product_id_index'])
user1.show()
recommendations = model.transform(user1) 
recommendations.orderBy('prediction',ascending=False).show()


+----------+--------------+-------------+----------------+
|product_id|       user_id|user_id_index|product_id_index|
+----------+--------------+-------------+----------------+
|B000063W1R|A2NJO6YE954DBH|          1.0|            37.0|
|B001QB5SCM|A2NJO6YE954DBH|          1.0|            87.0|
|B0000DK4QK|A2NJO6YE954DBH|          1.0|            51.0|
|B000KKQNRO|A2NJO6YE954DBH|          1.0|             3.0|
|B00004CTUN|A2NJO6YE954DBH|          1.0|           409.0|
|B00005Y6Y2|A2NJO6YE954DBH|          1.0|            43.0|
|B0006GAI6E|A2NJO6YE954DBH|          1.0|           210.0|
|B000ZLFALS|A2NJO6YE954DBH|          1.0|             5.0|
|B00005Y6YG|A2NJO6YE954DBH|          1.0|           817.0|
|B00005Y6YM|A2NJO6YE954DBH|          1.0|           366.0|
+----------+--------------+-------------+----------------+

+----------+--------------+-------------+----------------+----------+
|product_id|       user_id|user_id_index|product_id_index|prediction|
+----------+--------------+------

# Q4

In [4]:
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

# Additional Evaluation Metric: Mean Absolute Error (MAE)
evaluator_mae = RegressionEvaluator(
    metricName="mae",
    labelCol="score",
    predictionCol="prediction"
)

mae = evaluator_mae.evaluate(predictions)
print(f"Mean Absolute Error (MAE) = {mae}")


Root Mean Squared Error (RMSE) = 4.635533404969165
Mean Absolute Error (MAE) = 3.069216533127574
