## Exercise 2

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import stddev
from pyspark.sql.functions import col


# Create a SparkSession
spark = SparkSession.builder.appName('movie-recommender').getOrCreate()

# Load the ratings data
ratings_df = spark.read \
    .format("csv") \
    .option("header", "false") \
    .option("delimiter", "\t") \
    .load("ml-100k/u.data") \
    .toDF("user_id", "movie_id", "rating", "timestamp")

# Convert the user_id column to integer
ratings_df = ratings_df \
    .withColumn("user_id", ratings_df["user_id"].cast("integer")) \
    .withColumn("movie_id", ratings_df["movie_id"].cast("integer")) \
    .withColumn("rating", col("rating").cast("double"))

# Split the dataset into training and test sets
(training_data, test_data) = ratings_df.randomSplit([0.9, 0.1])

# Build the recommendation model using ALS
als = ALS(maxIter=20, regParam=0.01, userCol="user_id", itemCol="movie_id", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training_data)

predictions = model.transform(test_data)
evaluator_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
stddev_ = predictions.select(stddev('rating')).collect()[0][0]

print("Root-mean-square error = " + str(rmse))
print("Mean absolute error = " + str(mae))
print("Standard deviation of ratings = " + str(stddev_))

#user_recs=model.recommendForAllUsers(10).show(10)

Root-mean-square error = 1.0679537575423779
Mean absolute error = 0.8069672838032932
Standard deviation of ratings = 1.1320886930049068


In [16]:
    from pyspark.sql.functions import explode

    userRecs = model.recommendForAllUsers(10)

    userRecs = userRecs.select('user_id', explode('recommendations').alias('rec'))
    userRecs = userRecs.select('user_id', 'rec.movie_id', 'rec.rating')

    userRecs.show()



+-------+--------+---------+
|user_id|movie_id|   rating|
+-------+--------+---------+
|      1|    1129|  6.70789|
|      1|     361|6.0444226|
|      2|     968|6.2812023|
|      2|    1643|6.1274323|
|      3|    1315| 7.327296|
|      3|    1240|7.0101886|
|      4|    1160|11.327816|
|      4|    1319|10.987577|
|      5|    1368|7.6873527|
|      5|     793|7.1208434|
|      6|    1203|5.8663774|
|      6|     641|5.8135066|
|      7|    1172| 6.261402|
|      7|    1643|6.0340104|
|      8|    1126|7.2294436|
|      8|      57|7.1731596|
|      9|    1184| 9.622095|
|      9|    1643| 9.221812|
|     10|    1643|5.5851517|
|     10|     318|5.1168227|
+-------+--------+---------+
only showing top 20 rows



                                                                                

In [19]:
from pyspark.sql.functions import abs

# Create a SparkSession
spark = SparkSession.builder.appName('movie-recommender').getOrCreate()

# Load the ratings data
ratings_df = spark.read \
    .format("csv") \
    .option("header", "false") \
    .option("delimiter", "\t") \
    .load("ml-100k/u.data") \
    .toDF("user_id", "movie_id", "rating", "timestamp")

# Convert the user_id and movie_id columns to integer
ratings_df = ratings_df \
    .withColumn("user_id", ratings_df["user_id"].cast("integer")) \
    .withColumn("movie_id", ratings_df["movie_id"].cast("integer")) \
    .withColumn("rating", ratings_df["rating"].cast("double"))

# Split the dataset into training and test sets
(training_data, test_data) = ratings_df.randomSplit([0.9, 0.1])

# Build the recommendation model using ALS
als = ALS(maxIter=10, regParam=0.01, userCol="user_id", itemCol="movie_id", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training_data)

# Evaluate the model by computing the RMSE and MAE on the test data
predictions = model.transform(test_data)
evaluator_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
stddev_ = predictions.select(stddev('rating')).collect()[0][0]

threshold = 1  # Set the threshold for hit or miss

# Calculate the percentage of correct predictions
correct_predictions = predictions.filter(abs(predictions["rating"] - predictions["prediction"]) <= threshold).count()
total_predictions = predictions.count()
hit_rate = float(correct_predictions) / total_predictions

print("Root-mean-square error = " + str(rmse))
print("Mean absolute error = " + str(mae))
print("Standard deviation of ratings = " + str(stddev_))
print("Hit rate = {:.2%}".format(hit_rate))

# Generate top K movie recommendations for each user
user_recs = model.recommendForAllUsers(10).show(10)


Root-mean-square error = 1.0301542959288181
Mean absolute error = 0.7863825669192229
Standard deviation of ratings = 1.118840716891092
Hit rate = 70.68%




+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{613, 6.4316835}...|
|      3|[{1069, 6.427256}...|
|      5|[{1643, 6.9702296...|
|      6|[{1203, 6.3956494...|
|      9|[{74, 10.248373},...|
|     12|[{534, 7.0750175}...|
|     13|[{1615, 7.0178604...|
|     15|[{1153, 6.428089}...|
|     16|[{916, 6.5682964}...|
|     17|[{267, 10.156244}...|
+-------+--------------------+
only showing top 10 rows



                                                                                