In [47]:
!pip install pyspark



**Dataset**

In [48]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from IPython.display import Markdown

spark = SparkSession.builder.appName("RecommendationSystem").getOrCreate()

file_path = "/content/12_movielens_ratings.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True)

In [49]:
df.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [50]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



**Jaccard Similarity**

In [51]:
while True:
    try:
        user_input = int(input("Enter User ID (between 0 and 29): "))

        if 0 <= user_input <= 29:
            break
        else:
            print("Please enter a User ID between 0 and 29.")

    except ValueError:

        print("Invalid input. Please enter a valid integer.")

Enter User ID (between 0 and 29): 0


In [52]:
max_similarity = 0
max_similarity_user = None

df_user_0 = df.filter(df.userId == user_input)

for i in range(0, 30):
    if i == user_input:
        continue

    df_user_1 = df.filter(df.userId == i)
    joined_df = df_user_0.join(df_user_1, "movieId")
    joined_df1 = df_user_0.join(df_user_1, "movieId", how="full_outer")
    total_count = joined_df.count()
    total_count1 = joined_df1.count()
    print("Total common Movie Ratings Between UserID",user_input,"And UserID",i,"(Intersection):", total_count)
    print("Count of all Movie Ratings Between UserID",user_input,"And UserID",i,"(Union):", total_count1)

    jaccardsimilarity = total_count / total_count1
    print("\033[1mJaccard Similarity between UserID", user_input, "and UserID {}:\033[0m {}".format(i, jaccardsimilarity))

    if jaccardsimilarity > max_similarity:
        max_similarity = jaccardsimilarity
        max_similarity_user = i




Total common Movie Ratings Between UserID 0 And UserID 1 (Intersection): 22
Count of all Movie Ratings Between UserID 0 And UserID 1 (Union): 76
[1mJaccard Similarity between UserID 0 and UserID 1:[0m 0.2894736842105263
Total common Movie Ratings Between UserID 0 And UserID 2 (Intersection): 22
Count of all Movie Ratings Between UserID 0 And UserID 2 (Union): 73
[1mJaccard Similarity between UserID 0 and UserID 2:[0m 0.3013698630136986
Total common Movie Ratings Between UserID 0 And UserID 3 (Intersection): 21
Count of all Movie Ratings Between UserID 0 And UserID 3 (Union): 76
[1mJaccard Similarity between UserID 0 and UserID 3:[0m 0.27631578947368424
Total common Movie Ratings Between UserID 0 And UserID 4 (Intersection): 28
Count of all Movie Ratings Between UserID 0 And UserID 4 (Union): 76
[1mJaccard Similarity between UserID 0 and UserID 4:[0m 0.3684210526315789
Total common Movie Ratings Between UserID 0 And UserID 5 (Intersection): 24
Count of all Movie Ratings Between 

In [53]:
print("User ID with the highest Jaccard similarity to UserID", user_input, "is:", max_similarity_user)
print("Highest Jaccard Similarity:", max_similarity)

User ID with the highest Jaccard similarity to UserID 0 is: 14
Highest Jaccard Similarity: 0.43243243243243246


**Cosine Similarity**

In [54]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, sqrt
while True:
    try:
        user_input = int(input("Enter User ID (between 0 and 29): "))


        if 0 <= user_input <= 29:
            break
        else:
            print("Please enter a User ID between 0 and 29.")

    except ValueError:

        print("Invalid input. Please enter a valid integer.")

Enter User ID (between 0 and 29): 0


In [55]:
max_similarity = 0
max_similarity_user = None

df_user0 = df.filter(F.col("userId") == user_input).select("movieId", "rating").withColumnRenamed("rating", "rating_0")

for i in range(0, 30):
  if i == user_input:
    continue
  df_user1 = df.filter(F.col("userId") == i).select("movieId", "rating").withColumnRenamed("rating", "rating_1")
  joined_df = df_user0.join(df_user1, on="movieId")

  result = joined_df.withColumn("rating_product", F.col("rating_0") * F.col("rating_1")) \
                     .groupBy().sum("rating_product") \
                     .collect()[0][0]
  #df_user1.show()
  print("\033[1mΣk=1 xkyk:\033[0m {}".format(result))

  df_user_0 = df.filter(df.userId == user_input)
  df_squared1 = df_user_0.withColumn("rating_squared", col("rating") ** 2)
  df_grouped1 = df_squared1.groupBy("userId").agg(expr("sqrt(sum(rating_squared))").alias("sqrt_sum_ratings"))
  results1=df_grouped1.collect()
  df_grouped1.show()
  sqrt_sum_ratings_user00 = results1[0]["sqrt_sum_ratings"]
  print('√Σk=1 xk^2:',sqrt_sum_ratings_user00)

  df_user_1 = df.filter(df.userId == i)
  df_squared1 = df_user_1.withColumn("rating_squared", col("rating") ** 2)
  df_grouped1 = df_squared1.groupBy("userId").agg(expr("sqrt(sum(rating_squared))").alias("sqrt_sum_ratings"))
  results1 = df_grouped1.collect()
  sqrt_sum_ratings_user01 = results1[0]["sqrt_sum_ratings"]
  print('√Σk=1 yk^2:',sqrt_sum_ratings_user01)
  df_grouped1.show()
  a = sqrt_sum_ratings_user00 * sqrt_sum_ratings_user01
  print("\033[1m√Σk=1 xk^2 * yk^2:\033[0m {}".format(a))

  cosine = result / a

  print("\033[1mCosine similarity between UserID", user_input, "and UserID {}:\033[0m {}".format(i, cosine))
  print("\n")
  if cosine > max_similarity:
    max_similarity = cosine
    max_similarity_user = i



[1mΣk=1 xkyk:[0m 64.0
+------+-----------------+
|userId| sqrt_sum_ratings|
+------+-----------------+
|     0|11.40175425099138|
+------+-----------------+

√Σk=1 xk^2: 11.40175425099138
√Σk=1 yk^2: 12.328828005937952
+------+------------------+
|userId|  sqrt_sum_ratings|
+------+------------------+
|     1|12.328828005937952|
+------+------------------+

[1m√Σk=1 xk^2 * yk^2:[0m 140.5702671264446
[1mCosine similarity between UserID 0 and UserID 1:[0m 0.45528831457957786


[1mΣk=1 xkyk:[0m 75.0
+------+-----------------+
|userId| sqrt_sum_ratings|
+------+-----------------+
|     0|11.40175425099138|
+------+-----------------+

√Σk=1 xk^2: 11.40175425099138
√Σk=1 yk^2: 17.05872210923198
+------+-----------------+
|userId| sqrt_sum_ratings|
+------+-----------------+
|     2|17.05872210923198|
+------+-----------------+

[1m√Σk=1 xk^2 * yk^2:[0m 194.49935732541636
[1mCosine similarity between UserID 0 and UserID 2:[0m 0.38560538724309357


[1mΣk=1 xkyk:[0m 46.0
+------+-

In [56]:
print("User ID with the highest Cosine similarity to UserID", user_input, "is:", max_similarity_user)
print("Highest Cosine Similarity:", max_similarity)

User ID with the highest Cosine similarity to UserID 0 is: 1
Highest Cosine Similarity: 0.45528831457957786


**Centered Cosine Similarity**

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, sqrt

csv_file_path = "/content/12_movielens_ratings.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
while True:
    try:
        user_input = int(input("Enter User ID (between 0 and 29): "))


        if 0 <= user_input <= 29:
            break
        else:
            print("Please enter a User ID between 0 and 20.")

    except ValueError:

        print("Invalid input. Please enter a valid integer.")

Enter User ID (between 0 and 29): 0


In [58]:
max_similarity = 0
max_similarity_user = None
average_result = df.filter(col("userId") == user_input).agg(avg("rating").alias("rating_avg"))
average_result.show()
average = average_result.collect()[0]["rating_avg"]
print("\nAverage rating for userId",user_input, average)
df_modified = df.withColumn("rating", col("rating") - average).filter(col("userId") == user_input)
print("\nDataFrame with ratings subtracted by the average for userId",user_input)


df_modified.show()
for i in range(0,30):
  if i == user_input:
    continue
  average_result1 = df.filter(col("userId") == i).agg(avg("rating").alias("rating_avg"))
  average1 = average_result1.collect()[0]["rating_avg"]

  print("\033[1m\nAverage rating for userId {}:\033[0m {}".format(i, average1))

  df_modified1 = df.withColumn("rating", col("rating") - average1).filter(col("userId") == i)

  print("\033[1m\nDataFrame with ratings subtracted by the average for userId {}:\033[0m".format(i))

  df_modified1.show()
  df_combined = df_modified.union(df_modified1)

  print("\033[1m\nCombined DataFrame:\033[0m")

  df_combined.show()



  df_user0 = df_combined.filter(F.col("userId") == user_input).select("movieId", "rating").withColumnRenamed("rating", "rating_0")
  df_user1 = df_combined.filter(F.col("userId") == i).select("movieId", "rating").withColumnRenamed("rating", "rating_1")
  joined_df = df_user0.join(df_user1, on="movieId")
  result = joined_df.withColumn("rating_product", F.col("rating_0") * F.col("rating_1")) \
                 .groupBy().sum("rating_product") \
                 .collect()[0][0]

  print("\033[1mΣk=1 xkyk:\033[0m {}".format(result))


  df_user_0 = df_combined.filter(df_combined.userId == user_input)
  df_squared1 = df_user_0.withColumn("rating_squared", col("rating") ** 2)
  df_grouped1 = df_squared1.groupBy("userId").agg(expr("sqrt(sum(rating_squared))").alias("sqrt_sum_ratings"))
  results1=df_grouped1.collect()
  sqrt_sum_ratings_user00 = results1[0]["sqrt_sum_ratings"]
  df_grouped1.show()
  print('√Σk=1 xk^2:',sqrt_sum_ratings_user00)
  df_user_1 = df_combined.filter(df_combined.userId == i)
  df_squared1 = df_user_1.withColumn("rating_squared", col("rating") ** 2)
  df_grouped1 = df_squared1.groupBy("userId").agg(expr("sqrt(sum(rating_squared))").alias("sqrt_sum_ratings"))
  results1=df_grouped1.collect()
  sqrt_sum_ratings_user01 = results1[0]["sqrt_sum_ratings"]
  df_grouped1.show()
  print('√Σk=1 yk^2:',sqrt_sum_ratings_user01)
  a=sqrt_sum_ratings_user00 * sqrt_sum_ratings_user01

  print("√Σk=1 xk^2 * Σk=1 xkyk",a)
  cosine=result/a
  print("Centered cosine similarity between UserID",user_input,"and UserID",i,cosine)
  if cosine > max_similarity:
    max_similarity = cosine
    max_similarity_user = i

+------------------+
|        rating_avg|
+------------------+
|1.4285714285714286|
+------------------+


Average rating for userId 0 1.4285714285714286

DataFrame with ratings subtracted by the average for userId 0
+-------+-------------------+------+
|movieId|             rating|userId|
+-------+-------------------+------+
|      2| 1.5714285714285714|     0|
|      3|-0.4285714285714286|     0|
|      5| 0.5714285714285714|     0|
|      9|  2.571428571428571|     0|
|     11|-0.4285714285714286|     0|
|     12| 0.5714285714285714|     0|
|     15|-0.4285714285714286|     0|
|     17|-0.4285714285714286|     0|
|     19|-0.4285714285714286|     0|
|     21|-0.4285714285714286|     0|
|     23|-0.4285714285714286|     0|
|     26| 1.5714285714285714|     0|
|     27|-0.4285714285714286|     0|
|     28|-0.4285714285714286|     0|
|     29|-0.4285714285714286|     0|
|     30|-0.4285714285714286|     0|
|     31|-0.4285714285714286|     0|
|     34|-0.4285714285714286|     0|
|     

In [59]:
print("User ID with the highest Centered Cosine similarity to UserID", user_input, "is:", max_similarity_user)
print("Highest Centered Cosine Similarity:", max_similarity)

User ID with the highest Centered Cosine similarity to UserID 0 is: 28
Highest Centered Cosine Similarity: 0.35632617755129736


In [67]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.ml.evaluation import RegressionEvaluator
file_path = "/content/12_movielens_ratings.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
(training, test) = df.randomSplit([0.8, 0.2])
mean_ratings = training.groupBy("movieId").agg(avg("rating").alias("mean_rating"))
predictions = test.join(mean_ratings, "movieId", "left_outer")
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="mean_rating")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

top_predictions = mean_ratings.orderBy("mean_rating", ascending=False).limit(10)
precision_at_10 = top_predictions \
    .join(df.select("movieId", "rating"), "movieId") \
    .rdd.map(lambda x: abs(x[2] - x[1]) <= 0.5) \
    .reduce(lambda x, y: x + y) / top_predictions.count()
print(f"Precision at Top 10: {precision_at_10}")



Root Mean Squared Error (RMSE): 1.2414524134909568
Precision at Top 10: 2.4


In [73]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.ml.evaluation import RegressionEvaluator
file_path = "/content/12_movielens_ratings.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

df = df.withColumn("label", (col("rating") >= 3.0).cast("double"))
feature_cols = ["userId", "movieId"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)
(training, test) = df.randomSplit([0.8, 0.2])
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(training)
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"Area under ROC curve: {roc_auc}")
precision = predictions.filter("prediction = label").count() / predictions.count()
print(f"Precision: {precision}")
coverage = predictions.select("movieId").distinct().count() / df.select("movieId").distinct().count()
print(f"Coverage: {coverage}")


Area under ROC curve: 0.5909657320872275
Precision: 0.781021897810219
Coverage: 0.96
