In [1]:

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col


In [2]:

# -----------------------------
# Start Spark
# -----------------------------
spark = SparkSession.builder \
    .appName("ALS_Similar_Movies_100_SparkML") \
    .getOrCreate()

# -----------------------------
# Load movies.dat
# -----------------------------
movies = {}
with open("movies.dat", "r", encoding="ISO-8859-1") as f:
    for line in f:
        fields = line.strip().split("::")
        movies[int(fields[0])] = fields[1]



In [3]:
# -----------------------------
# Load ratings.dat
# -----------------------------
ratings_rdd = spark.sparkContext.textFile("ratings.dat")

ratings_df = ratings_rdd.map(lambda x: x.split("::")) \
    .map(lambda f: (int(f[0]), int(f[1]), float(f[2]))).toDF(["userId", "movieId", "rating"])

# -----------------------------
# Train ALS model
# -----------------------------
als = ALS(
    maxIter=10,
    regParam=0.05,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)
model = als.fit(ratings_df)

# -----------------------------
# Prepare item factors for LSH
# -----------------------------
itemFactors = model.itemFactors.withColumnRenamed("id", "movieId")

# Convert feature list to ML Vector type (required by LSH)
itemFactors = itemFactors.rdd.map(
    lambda row: (row.movieId, Vectors.dense(row.features))
).toDF(["movieId", "features"])



In [4]:
# -----------------------------
# Build LSH Model (Spark ML)
# -----------------------------
lsh = BucketedRandomProjectionLSH(
    inputCol="features",
    outputCol="hashes",
    bucketLength=2.0
)

lsh_model = lsh.fit(itemFactors)

# -----------------------------
# Find similar movies (100% Spark ML)
# -----------------------------
def findSimilarMovies(movieId, topN=10):

    target = itemFactors.filter(col("movieId") == movieId)

    if target.count() == 0:
        print("Movie not found:", movieId)
        return

    similar = lsh_model.approxNearestNeighbors(
        dataset=itemFactors,
        key=target.first().features,
        numNearestNeighbors=topN + 1 # Get one more to exclude itself later
    )

    # Remove itself and limit to topN
    similar = similar.filter(col("movieId") != movieId)

    # Collect results
    results = similar.orderBy("distCol").limit(topN).collect()

    print(f"\nTop {topN} movies similar to: {movies[movieId]}\n")

    for row in results:
        name = movies.get(row.movieId, "Unknown")
        print(f"{name}  (distance={row.distCol:.4f})")




In [5]:
# -----------------------------
# Test
# -----------------------------
findSimilarMovies(1, topN=10)

spark.stop()


Top 10 movies similar to: Movie 1 (2001)

Movie 13 (2003)  (distance=0.7588)
Movie 19 (2009)  (distance=1.5993)
Movie 96 (2006)  (distance=2.2107)
Movie 99 (2009)  (distance=2.2764)
Movie 98 (2008)  (distance=2.5893)
Movie 9 (2009)  (distance=2.6385)
Movie 53 (2003)  (distance=2.6593)
Movie 33 (2003)  (distance=2.7709)
Movie 8 (2008)  (distance=2.7831)
Movie 58 (2008)  (distance=2.7831)
