In [6]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

def computeCosineSimilarity(data):
    return (data
        .withColumn("xx", F.col("rating1") * F.col("rating1"))
        .withColumn("yy", F.col("rating2") * F.col("rating2")) 
        .withColumn("xy", F.col("rating1") * F.col("rating2"))
        .groupBy("movie1", "movie2")
        .agg(F.sum("xy").alias("numerator"),
             (F.sqrt(F.sum("xx")) * F.sqrt(F.sum("yy"))).alias("denominator"))
        .withColumn("score", F.col("numerator") / F.col("denominator"))
        .select("movie1", "movie2", "score")
    )

def main():
    
    print("=" * 50)
    print("MOVIE RECOMMENDATION SYSTEM")
    print("=" * 50)
    
    try:
        movie_id = int(input("Please enter a movie ID: "))
        print(f"\nProcessing recommendations for movie ID: {movie_id}...")
    except ValueError:
        print("Error: Please enter a valid integer for movie ID")
        return
    
    spark = SparkSession.builder.appName("MovieRecommendations").getOrCreate()

    # Load data
    print("Loading movie data...")
    movies_df = spark.read.option("sep", "|").option("charset", "ISO-8859-1") \
        .csv("ml-100k/u.item") \
        .select(F.col("_c0").alias("movieID").cast(IntegerType()), 
                F.col("_c1").alias("movieTitle"))

    print("Loading ratings data...")
    ratings_df = spark.read.option("sep", "\t").csv("ml-100k/u.data") \
        .select(F.col("_c0").alias("userID").cast(IntegerType()),
                F.col("_c1").alias("movieID").cast(IntegerType()), 
                F.col("_c2").alias("rating").cast(IntegerType()))

    # Check if movie exists
    movie_exists = movies_df.filter(F.col("movieID") == movie_id).count()
    if movie_exists == 0:
        print(f"\nError: Movie with ID {movie_id} not found in the database!")
        print("Please enter a valid movie ID.")
        spark.stop()
        return

   
    movie_title = movies_df.filter(F.col("movieID") == movie_id) \
                          .select("movieTitle") \
                          .first()["movieTitle"]

    print(f"Movie found: {movie_title}")
    print("Computing similarities...")

    # Compute similarities
    movie_pairs = (ratings_df.alias("r1")
        .join(ratings_df.alias("r2"), 
              (F.col("r1.userID") == F.col("r2.userID")) & 
              (F.col("r1.movieID") < F.col("r2.movieID")))
        .select(F.col("r1.movieID").alias("movie1"),
                F.col("r2.movieID").alias("movie2"),
                F.col("r1.rating").alias("rating1"), 
                F.col("r2.rating").alias("rating2"))
    )

    similarities = computeCosineSimilarity(movie_pairs)

    # Find similar movies
    print("Finding top recommendations...")
    top_similar = (similarities
        .filter((F.col("movie1") == movie_id) | (F.col("movie2") == movie_id))
        .withColumn("similar_movie", 
                    F.when(F.col("movie1") == movie_id, F.col("movie2"))
                     .otherwise(F.col("movie1")))
        .join(movies_df, F.col("similar_movie") == F.col("movieID"))
        .select("movieTitle", "score")
        .orderBy(F.desc("score"))
        .limit(10)
    )

    print("\n" + "=" * 50)
    print(f"TOP 10 MOVIES SIMILAR TO: {movie_title}")
    print("=" * 50)
    top_similar.show(truncate=False)
    
   
    print("\nRECOMMENDATIONS:")
    recommendations = top_similar.collect()
    for i, rec in enumerate(recommendations, 1):
        print(f"{i}. {rec['movieTitle']} (Similarity Score: {rec['score']:.4f})")

    spark.stop()
    print("\nProcessing complete!")

if __name__ == "__main__":
    main()

MOVIE RECOMMENDATION SYSTEM

Processing recommendations for movie ID: 68...
Loading movie data...
Loading ratings data...
Movie found: Crow, The (1994)
Computing similarities...
Finding top recommendations...

TOP 10 MOVIES SIMILAR TO: Crow, The (1994)
+---------------------------------+------------------+
|movieTitle                       |score             |
+---------------------------------+------------------+
|Across the Sea of Time (1995)    |1.0000000000000002|
|Tango Lesson, The (1997)         |1.0000000000000002|
|Germinal (1993)                  |1.0000000000000002|
|Killer (Bulletproof Heart) (1994)|1.0000000000000002|
|Shiloh (1997)                    |1.0000000000000002|
|Afterglow (1997)                 |1.0000000000000002|
|To Be or Not to Be (1942)        |1.0               |
|Perfect Candidate, A (1996)      |1.0               |
|Unzipped (1995)                  |1.0               |
|Angel and the Badman (1947)      |1.0               |
+-------------------------------