In [0]:
spark.conf.set("fs.azure.account.key.arulrajgopalshare.dfs.core.windows.net","")
# spark.conf.set("spark.sql.adaptive.enabled", "false")
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# spark.conf.set("spark.sql.shuffle.partitions", 8)
# spark.conf.set("spark.sql.files.maxPartitionBytes", "500m")


#data preparation

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, DoubleType, StringType
from pyspark.sql.functions import col,sum as spark_sum, sqrt, desc,count

# Step 1: reading the data
path = "abfss://kaninipro@arulrajgopalshare.dfs.core.windows.net"
schema = StructType([

    StructField("item_id", IntegerType(), True),
    StructField("user_id", LongType(), True),
    StructField("rating", DoubleType(), True),
])

ratings_df = spark.read.format("json")\
    .schema(schema) \
    .load(f"{path}/movielens_2gb/ratings.json").filter(col("item_id")<=5000)


# Step 2: Self-join ratings on user_id to find item pairs rated by same user
pair_ratings = (
    ratings_df.alias("ratings1")
    .join(ratings_df.alias("ratings2"),
          (col("ratings1.user_id") == col("ratings2.user_id")) &
          (col("ratings1.item_id") < col("ratings2.item_id")))  # avoid duplicate pairs
    .select(
        col("ratings1.item_id").alias("item1"),
        col("ratings2.item_id").alias("item2"),
        col("ratings1.rating").alias("rating1"),
        col("ratings2.rating").alias("rating2")
    )
)

# Step 3: Compute Cosine Similarity per item pair
pair_stats = (
    pair_ratings
    .groupBy("item1", "item2")
    .agg(
        spark_sum(col("rating1") * col("rating2")).alias("sum_xy"),
        spark_sum(col("rating1") * col("rating1")).alias("sum_xx"),
        spark_sum(col("rating2") * col("rating2")).alias("sum_yy"),
        count("*").alias("numPairs")
    )
)


pair_stats.write \
    .mode("overwrite") \
    .format("parquet") \
    .option("path", f"{path}/test_path/pair_stats") \
    .saveAsTable("pair_stats")

In [0]:
schema = StructType([
    StructField("title", StringType(), True),
    StructField("starring", StringType(), True),
    StructField("dateAdded", StringType(), True),
    StructField("avgRating", DoubleType(), True),
    StructField("imdbId", StringType(), True),
    StructField("item_id", IntegerType(), True)
])

movie_data = spark.read.format("json")\
    .schema(schema) \
    .load(f"{path}/movielens_2gb/metadata.json")\
    .select("item_id","title")

movie_data.write \
    .mode("overwrite") \
    .format("parquet") \
    .option("path", f"{path}/test_path/movie_data") \
    .saveAsTable("movie_data")

#data processing

In [0]:
# spark.conf.set("spark.sql.shuffle.partitions", 8)
# spark.conf.set("spark.sql.files.maxPartitionBytes", "500m")

In [0]:
dbutils.fs.rm("abfss://kaninipro@arulrajgopalshare.dfs.core.windows.net/test_path/similar_movies",True)

In [0]:
from pyspark.sql.functions import col,sqrt,desc

pair_stats = spark.table("pair_stats")
movie_data = spark.table("movie_data")
path = "abfss://kaninipro@arulrajgopalshare.dfs.core.windows.net"

pair_scores = pair_stats.withColumn(
    "score",
    (col("sum_xy") / (sqrt(col("sum_xx")) * sqrt(col("sum_yy"))))
)


filtered_results = pair_scores.filter(
    (col("numPairs") > 10) & (col("score") > 0.95)
).select("item1","item2","numPairs","score")

results = filtered_results.orderBy(desc("score"))


movie_name_attached_df = results.alias("A").join(movie_data.alias("B"),col("A.item1")==col("B.item_id"))\
                                         .join(movie_data.alias("C"),col("A.item2")==col("C.item_id"))\
                                        .selectExpr("A.*","B.title as title1","C.title as title2")

movie_name_attached_df.write \
    .mode("overwrite") \
    .format("parquet") \
    .option("path", "your_desired_directory_path/test_path/similar_movies") \
    .saveAsTable("similar_movies")



In [0]:
%sql
select  * from similar_movies