In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder.appName('recommender').getOrCreate()



In [2]:
ratings = spark.read.option("inferSchema", True).option("header", True).csv("ratings.csv").limit(1000)
ratings.show(2)


+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      1|   4.0|1225734739|
|     1|    110|   4.0|1225865086|
+------+-------+------+----------+
only showing top 2 rows



In [3]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [4]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_index").fit(ratings)
    for column in ["userId", "movieId"]
]

pipeline = Pipeline(stages=indexers)
ratings_indexed = pipeline.fit(ratings).transform(ratings)

training_data,validation_data = ratings_indexed.randomSplit([8.0,2.0])

In [5]:
als = ALS(userCol="userId_index",itemCol="movieId_index",ratingCol="rating",rank=5,maxIter=3,regParam=0.01,coldStartStrategy="drop")

In [6]:

model = als.fit(training_data)

In [7]:
evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

model = als.fit(training_data)
predictions=model.transform(validation_data)
predictions.show(10,False)


+------+-------+------+----------+------------+-------------+-----------+
|userId|movieId|rating|timestamp |userId_index|movieId_index|prediction |
+------+-------+------+----------+------------+-------------+-----------+
|4     |5995   |5.0   |1442455622|12.0        |176.0        |3.380457   |
|4     |50872  |5.0   |1442455546|12.0        |70.0         |2.9350915  |
|4     |79091  |5.0   |1442456126|12.0        |187.0        |3.382306   |
|4     |109487 |4.5   |1442684878|12.0        |86.0         |-0.7537474 |
|7     |3      |3.0   |974517393 |1.0         |138.0        |-0.62056565|
|7     |62     |3.0   |974519061 |1.0         |78.0         |-1.5140789 |
|7     |296    |3.0   |974518074 |1.0         |3.0          |1.7705758  |
|7     |349    |4.0   |974520676 |1.0         |58.0         |1.5281651  |
|7     |527    |4.0   |974517903 |1.0         |27.0         |-0.49645248|
|7     |585    |3.0   |974520261 |1.0         |71.0         |2.2342389  |
+------+-------+------+----------+----

In [8]:
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")


Root Mean Squared Error (RMSE) = 3.9951741883579195


# Entity Resolution

In [9]:
parsed = spark.read.option("inferSchema", True).option("header", True).csv("ratings.csv").limit(1000)


In [10]:
from pyspark.sql.functions import avg, count

# Group by userId and movieId, aggregate ratings
aggregated_ratings = ratings.groupBy("userId", "movieId") \
    .agg(avg("rating").alias("average_rating"), count("rating").alias("rating_count"))

# Join aggregated data back to original DataFrame
merged_ratings = ratings.join(aggregated_ratings, ["userId", "movieId"], "left_outer")

# Show the merged DataFrame
merged_ratings.show(5)


+------+-------+------+----------+--------------+------------+
|userId|movieId|rating| timestamp|average_rating|rating_count|
+------+-------+------+----------+--------------+------------+
|     1|      1|   4.0|1225734739|           4.0|           1|
|     1|    110|   4.0|1225865086|           4.0|           1|
|     1|    158|   4.0|1225733503|           4.0|           1|
|     1|    260|   4.5|1225735204|           4.5|           1|
|     1|    356|   5.0|1225735119|           5.0|           1|
+------+-------+------+----------+--------------+------------+
only showing top 5 rows



In [11]:
indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_index").fit(ratings)
    for column in ["userId", "movieId"]
]

pipeline = Pipeline(stages=indexers)
ratings_indexed = pipeline.fit(ratings).transform(ratings)

training_data,validation_data = ratings_indexed.randomSplit([8.0,2.0])

In [12]:
model = als.fit(training_data)

In [13]:
evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

model = als.fit(training_data)
predictions=model.transform(validation_data)
predictions.show(10,False)

+------+-------+------+----------+------------+-------------+-----------+
|userId|movieId|rating|timestamp |userId_index|movieId_index|prediction |
+------+-------+------+----------+------------+-------------+-----------+
|4     |1097   |3.5   |1442455733|12.0        |15.0         |-0.89528036|
|4     |2858   |4.0   |1442455646|12.0        |18.0         |5.629303   |
|4     |50872  |5.0   |1442455546|12.0        |70.0         |2.6496496  |
|7     |34     |3.0   |974523039 |1.0         |20.0         |-1.2591667 |
|7     |39     |2.0   |974519479 |1.0         |64.0         |-0.90530145|
|7     |225    |4.0   |974521503 |1.0         |51.0         |0.18725494 |
|7     |349    |4.0   |974520676 |1.0         |58.0         |-1.2070689 |
|7     |551    |3.0   |974517144 |1.0         |29.0         |-3.589035  |
|7     |1246   |4.0   |974522217 |1.0         |96.0         |1.193851   |
|7     |1721   |4.0   |974519617 |1.0         |117.0        |-0.61102957|
+------+-------+------+----------+----

In [14]:
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")


Root Mean Squared Error (RMSE) = 4.317241908222907
