In [14]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, Normalizer
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ContentBasedFiltering").config("spark.sql.execution.arrow.maxRecordsPerBatch", 50).getOrCreate()
data = spark.read.option("header", "true").csv("new_data.csv")
data = data.na.drop(subset=["tags"])
tokenizer = Tokenizer(inputCol="tags", outputCol="words")
data = tokenizer.transform(data)
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
data = remover.transform(data)
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures")
data = hashingTF.transform(data)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(data)
data = idfModel.transform(data)
assembler = VectorAssembler(inputCols=["features"], outputCol="feature_vector")
data = assembler.transform(data)
normalizer = Normalizer(inputCol="feature_vector", outputCol="normalized_features")
data = normalizer.transform(data)

selected_data = data.select("name", "normalized_features")

def get_top_movies(movie_name, top_n):
    movie_data = selected_data.filter(col("name") == movie_name)
    cross_joined_data = selected_data.crossJoin(movie_data.withColumnRenamed("name", "movie_name_2").withColumnRenamed("normalized_features", "normalized_features_2"))
    def cosine_similarity(v1, v2):
        return float(v1.dot(v2) / (v1.norm(2) * v2.norm(2)))
    cosine_similarity_udf = spark.udf.register("cosine_similarity", cosine_similarity)
    result = cross_joined_data.withColumn("similarity", cosine_similarity_udf("normalized_features", "normalized_features_2")).filter(col("name") != col("movie_name_2"))
    result = result.withColumn("similarity", col("similarity").cast("double"))
    result = result.withColumn("similarity", col("similarity").cast("decimal(10,5)"))
    recommendations = (
        result.select(col("name").alias("movie_name"), "similarity")
        .orderBy(col("similarity").desc())
        .limit(top_n)
    )
    print(f"Recommendations for movie {movie_name} are:")
    recommendations.show(100, truncate=False)


                                                                                

In [13]:
get_top_movies("The Spectacular Spider-Man", 100)

23/11/30 23:13:47 WARN SimpleFunctionRegistry: The function cosine_similarity replaced a previously registered function.


Recommendations for movie The Spectacular Spider-Man are:


23/11/30 23:13:48 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/11/30 23:13:52 WARN DAGScheduler: Broadcasting large task binary with size 18.7 MiB
[Stage 24:>                                                         (0 + 1) / 1]

+---------------------------------------------------------------------------+----------+
|movie_name                                                                 |similarity|
+---------------------------------------------------------------------------+----------+
|DC Super Hero Girls                                                        |0.18529   |
|Spider-Man                                                                 |0.17049   |
|Spider-Man                                                                 |0.15340   |
|The Avengers: United They Stand                                            |0.14020   |
|The Thundermans                                                            |0.13975   |
|Stripperella                                                               |0.13836   |
|Me, Myself & I                                                             |0.13280   |
|Spider-Man and His Amazing Friends                                         |0.12166   |
|Heroes              

                                                                                